优秀的编程知识分享平台

网站首页 > 技术文章 正文

Redis通过Redisson实现延迟队列(含源码)

nanyue 2024-09-01 00:06:18 技术文章 5 ℃

实现延迟队列的方式有很多种,有本地自己jdk方式实现、Quartz 定时任务实现、RabbitMQ 延时队列实现,还有Redis方式实现。综合自己的生产情况,Redis是符合分布式服务及开发成本较小的一种方式。基本的机制如下图


本文将通过工具包Redisson用极简单的方式来实现一个延迟队列。同时提供一下比较完备的例子。

一、延迟队列的使用场景

背景:

1、当订单一直处于未支付状态时,如何及时地关闭订单

2、如何定期检查处于退款状态的订单是否已经退款成功

3、在订单长时间没有收到下游系统的状态通知的时候,如何实现阶梯式的同步订单状态的策略

4、在系统通知上游系统支付成功终态时,上游系统返回通知失败,如何进行异步通知实行分频率发送:15s 3m 10m 30m 30m 1h 2h 6h 15h

一般的解决方案:

1、最简单的方式,定时扫表。例如对于订单支付失效要求比较高的,每2S扫表一次检查过期的订单进行主动关单操作。优点是简单,缺点是每分钟全局扫表,浪费资源,如果遇到表数据订单量即将过期的订单量很大,会造成关单延迟。

2、使用RabbitMq或者其他MQ改造实现延迟队列,优点是,开源,现成的稳定的实现方案,缺点是:MQ是一个消息中间件,如果团队技术栈本来就有MQ,那还好,如果不是,那为了延迟队列而去部署一套MQ成本有点大

3、使用Redis的zset、list的特性,我们可以利用redis来实现一个延迟队列RedisDelayQueue。本文使用Redisson来实现。

二、Redisson实现方式

1、添加包

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.5</version>
</dependency>

2、实现

下面用一个抢红包的例子来演示实现过程。在一定时间内,红包没有领取的话,将失效。

创建红包类

import java.io.Serializable;

public class RedPacketMessage implements Serializable {
    /**
     * 红包 ID
     */
    private  long redPacketId;

    /**
     * 创建时间戳
     */
    private  long timestamp;

    public RedPacketMessage() {

    }

    public RedPacketMessage(long redPacketId) {
        this.redPacketId = redPacketId;
        this.timestamp = System.currentTimeMillis();
    }

    public long getRedPacketId() {
        return redPacketId;
    }

    public long getTimestamp() {
        return timestamp;
    }

}

实现类


import com.test.moredatasourse.bean.RedPacketMessage;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Time;
import java.util.concurrent.TimeUnit;

/**
 * 延迟队列 redis实现
 */
public class RedPacketDelayQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedPacketDelayQueue.class);

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.197.24:36379");
        RedissonClient redissonClient = Redisson.create(config);
        /**
         * 红包目标队列
         */
        RBlockingQueue<RedPacketMessage> blockingRedPacketQueue
                = redissonClient.getBlockingQueue("redPacketDelayQueue");
        /**
         * 定时任务将到期的元素转移到目标队列
         */
        RDelayedQueue<RedPacketMessage> delayedRedPacketQueue
                = redissonClient.getDelayedQueue(blockingRedPacketQueue);

        /**
         * 延时信息入队列
         */
        // 经常会出现首条消息消费不了的情况,加一个初始为0的消息。
        delayedRedPacketQueue.offer(new RedPacketMessage(0), 1, TimeUnit.SECONDS);

        // 下面是真正的业务
        delayedRedPacketQueue.offer(new RedPacketMessage(5), 5, TimeUnit.SECONDS);
        delayedRedPacketQueue.offer(new RedPacketMessage(10), 10, TimeUnit.SECONDS);
        delayedRedPacketQueue.offer(new RedPacketMessage(30), 30, TimeUnit.SECONDS);
        delayedRedPacketQueue.offer(new RedPacketMessage(50), 50, TimeUnit.SECONDS);
        LOGGER.info("红包ID:");
        while (true){
            /**
             * 取出失效红包
             */
            RedPacketMessage redPacket = blockingRedPacketQueue.take();
            LOGGER.info("红包ID:{}过期失效",redPacket.getRedPacketId());
            /**
             * 处理相关业务逻辑:记录相关信息并退还剩余红包金额
             */
        }

    }
}

三、源码

1、本文源码:

https://gitee.com/yaokj/demo-toutiao/blob/master/src/main/java/com/test/moredatasourse/utils/RedPacketDelayQueue.java

本文的方式很简单。还有一个稍完备的可以看:

https://blog.csdn.net/zxl646801924/article/details/107610699

2、本文并没有解决删除消息的情况。在实际中,还是会遇到从队列中删除一个元素的情况。

还有一个更完备的例子的源码:

https://gitee.com/fork-out-project/zing-project/tree/master/zing-delay-queue

Tags:

最近发表
标签列表