实现延迟队列的方式有很多种,有本地自己jdk方式实现、Quartz 定时任务实现、RabbitMQ 延时队列实现,还有Redis方式实现。综合自己的生产情况,Redis是符合分布式服务及开发成本较小的一种方式。基本的机制如下图
本文将通过工具包Redisson用极简单的方式来实现一个延迟队列。同时提供一下比较完备的例子。
一、延迟队列的使用场景
背景:
1、当订单一直处于未支付状态时,如何及时地关闭订单
2、如何定期检查处于退款状态的订单是否已经退款成功
3、在订单长时间没有收到下游系统的状态通知的时候,如何实现阶梯式的同步订单状态的策略
4、在系统通知上游系统支付成功终态时,上游系统返回通知失败,如何进行异步通知实行分频率发送:15s 3m 10m 30m 30m 1h 2h 6h 15h
一般的解决方案:
1、最简单的方式,定时扫表。例如对于订单支付失效要求比较高的,每2S扫表一次检查过期的订单进行主动关单操作。优点是简单,缺点是每分钟全局扫表,浪费资源,如果遇到表数据订单量即将过期的订单量很大,会造成关单延迟。
2、使用RabbitMq或者其他MQ改造实现延迟队列,优点是,开源,现成的稳定的实现方案,缺点是:MQ是一个消息中间件,如果团队技术栈本来就有MQ,那还好,如果不是,那为了延迟队列而去部署一套MQ成本有点大
3、使用Redis的zset、list的特性,我们可以利用redis来实现一个延迟队列RedisDelayQueue。本文使用Redisson来实现。
二、Redisson实现方式
1、添加包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.5</version>
</dependency>
2、实现
下面用一个抢红包的例子来演示实现过程。在一定时间内,红包没有领取的话,将失效。
创建红包类
import java.io.Serializable;
public class RedPacketMessage implements Serializable {
/**
* 红包 ID
*/
private long redPacketId;
/**
* 创建时间戳
*/
private long timestamp;
public RedPacketMessage() {
}
public RedPacketMessage(long redPacketId) {
this.redPacketId = redPacketId;
this.timestamp = System.currentTimeMillis();
}
public long getRedPacketId() {
return redPacketId;
}
public long getTimestamp() {
return timestamp;
}
}
实现类
import com.test.moredatasourse.bean.RedPacketMessage;
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
/**
* 延迟队列 redis实现
*/
public class RedPacketDelayQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(RedPacketDelayQueue.class);
public static void main(String[] args) throws Exception {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.197.24:36379");
RedissonClient redissonClient = Redisson.create(config);
/**
* 红包目标队列
*/
RBlockingQueue<RedPacketMessage> blockingRedPacketQueue
= redissonClient.getBlockingQueue("redPacketDelayQueue");
/**
* 定时任务将到期的元素转移到目标队列
*/
RDelayedQueue<RedPacketMessage> delayedRedPacketQueue
= redissonClient.getDelayedQueue(blockingRedPacketQueue);
/**
* 延时信息入队列
*/
// 经常会出现首条消息消费不了的情况,加一个初始为0的消息。
delayedRedPacketQueue.offer(new RedPacketMessage(0), 1, TimeUnit.SECONDS);
// 下面是真正的业务
delayedRedPacketQueue.offer(new RedPacketMessage(5), 5, TimeUnit.SECONDS);
delayedRedPacketQueue.offer(new RedPacketMessage(10), 10, TimeUnit.SECONDS);
delayedRedPacketQueue.offer(new RedPacketMessage(30), 30, TimeUnit.SECONDS);
delayedRedPacketQueue.offer(new RedPacketMessage(50), 50, TimeUnit.SECONDS);
LOGGER.info("红包ID:");
while (true){
/**
* 取出失效红包
*/
RedPacketMessage redPacket = blockingRedPacketQueue.take();
LOGGER.info("红包ID:{}过期失效",redPacket.getRedPacketId());
/**
* 处理相关业务逻辑:记录相关信息并退还剩余红包金额
*/
}
}
}
三、源码
1、本文源码:
https://gitee.com/yaokj/demo-toutiao/blob/master/src/main/java/com/test/moredatasourse/utils/RedPacketDelayQueue.java
本文的方式很简单。还有一个稍完备的可以看:
https://blog.csdn.net/zxl646801924/article/details/107610699
2、本文并没有解决删除消息的情况。在实际中,还是会遇到从队列中删除一个元素的情况。
还有一个更完备的例子的源码:
https://gitee.com/fork-out-project/zing-project/tree/master/zing-delay-queue