在之前的系统,用户主要是经销商,用户数不多,数据量也不大,便采用了最简单的方式-定时扫描表,这种方式确实简单粗暴,如果要实时性较高,那么就只有频繁扫描表,这样就增加了数据库的压力,如果要减轻数据库压力,那么就只有降低扫描频率,这样就出现延迟很高。这次的新系统的用户和数据量较之前的系统就要多一些了,如果再使用之前的方式,不止数据库压力会很大,用户体验也不好。
现在,有很多支持延迟消息的开源消息队列,如:beanstalkd、rabbitmq、rocketmq、nsq等,最开始,我们有想到过用消息队列来实现这种延迟消费,但是,这又增加了我们的运维成本,便暂时放弃这种方案。因为系统有用到过Redis,当时也有想到过基于Redis的key过期事件机制来实现,但是这货有个大问题,如果消费程序重启或者挂掉期间有key过期,这部分的事件就丢失了,所以这种方式也不想。直到某天,我在查看Redisson文档时,突然眼前一亮,看到一个关键字RDelayedQueue,这不就是延迟队列吗?正好现在系统有用到Redis,要不就试试这个家伙吧。
引入相关包
这里我主要用到了hutool、lombok和redisson
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.7.16</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.5</version>
</dependency>
|
简单配置
这里只需要简单配置一些Redisson的redis连接信息即可
1
2
3
4
| spring:
redis:
host: 127.0.0.1
port: 6379
|
Show Code
这里,我新建了一个OrderDto类,简单加入两个属性
1
2
3
4
5
6
| @Getter
@Setter
public class OrderDto implements Serializable {
private String orderNo;
private String createDate;
}
|
再新建一个Runner类,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| @Component
public class RedissonRunner implements ApplicationRunner {
@Autowired
RedissonClient redissonClient;
private static final int DELAYTIME = 1 * 60; //延迟时间,1分钟
@Override
public void run(ApplicationArguments args) throws Exception {
RBlockingQueue<OrderDto> blockingQueue = redissonClient.getBlockingQueue("delay_queue");
RDelayedQueue<OrderDto> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
for (int orderIndex = 0; orderIndex <= 99; orderIndex++) {
OrderDto orderDto = new OrderDto();
orderDto.setOrderNo("order0" + String.valueOf(orderIndex));
orderDto.setCreateDate(DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss:SSS"));
delayedQueue.offer(orderDto, DELAYTIME, TimeUnit.SECONDS); //放入队列
}
ThreadUtil.execAsync(() -> {
while (true) {
OrderDto orderDto = blockingQueue.poll();
if (ObjectUtils.isEmpty(orderDto)) {
try {
Thread.sleep(1000);
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(StrUtil.format("当前时间:[{}],订单号:[{}],订单时间:[{}]", DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss:SSS"), orderDto.getOrderNo(), orderDto.getCreateDate()));
}
});
}
}
|
这里,我们为了模拟延迟,就把延迟时间设置为60秒。好,我们运行起来看看效果,当我们运行起来时,redis里面会多出两个key
等待一分钟后,我们便能通过poll来获取需要消费的消息
当消息消费完后且无新消息写入该队列中,这两个key就会被删除。
我们再来模拟一下,消费程序退出再重启的情况下,是否可以继续消费队列。
在redis中我们也可以看到超过延迟时间且还未消费的消息会一直存在,直到消费端将其消费。
这次没有时间去看底层的实现逻辑,先用着试试看,后面有时间再深入研究。如果家人们有更好的方式,请提出你们宝贵的意见,谢谢。