优秀的编程知识分享平台

网站首页 > 技术文章 正文

如何实现延迟队列(JDK/mysql/redis/Rabbit)

nanyue 2025-03-28 19:29:40 技术文章 3 ℃

何为延迟队列

队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟。延迟队列中的每个元素都指定了延迟时间,表示该元素到达指定时间之后将出队进行处理。其实从上述定义来看,与其说是延迟队列,不如说它是一个以时间为权重最小堆结构

那么延迟队列有什么用呢?我们生活中其实平时接触到很多可以使用延迟队列来解决的例子:

  • 订单超时30分钟未付款将自动关闭
  • 会议系统中,会议开始前10分钟,发送会议提醒
  • 夏天晚上时,我们经常会给空调设置指定时长的时间,到时空调自动关闭
  • 再比如微波炉、烤箱、等等

可以发现延迟队列想要实现的功能其实就是一个定时任务调度的一种。

延迟队列实现方式

延迟队列实现的方式有很多种,具体采用哪种去实现,和我们的业务背景、业务诉求都息息相关,不同的实现方式都有其适用的应用场景,我这里将延迟队列分为两类:单机延迟队列分布式延迟队列

1. 单机实现

JDK 提供了DelayedQueue可以实现延迟队列的目的。其类图如下:

可以看到DelayedQueue是一个阻塞队列,其队列中的元素必须实现Delayed接口:

public interface Delayed extends Comparable {
    long getDelay(TimeUnit unit);
}

其中getDelay返回代表该元素的一个在队列中可存在的时间,通过这种方式来实现元素的延迟弹出。接下来看订单超时30秒将自动关闭的实际例子:

public class JDKDelayQueueTest {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private static final DelayQueue DELAY_QUEUE = new DelayQueue<>();

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {

        EXECUTOR_SERVICE.submit(() -> {
            while (true) {
                if (!DELAY_QUEUE.isEmpty()) {
                    Order order = DELAY_QUEUE.poll();
                    if (order != null) {
                        System.out.println(order.getOrderId() + " 超时关闭与:" + FORMATTER.format(LocalDateTime.now()));
                    }

                }
                TimeUnit.MILLISECONDS.sleep(1000);
            }
        });
        EXECUTOR_SERVICE.submit(() -> {
            try {
                DELAY_QUEUE.add(new Order("黄焖鸡订单"));
                TimeUnit.SECONDS.sleep(5);
                DELAY_QUEUE.add(new Order("麻辣香锅订单"));
                TimeUnit.SECONDS.sleep(10);
                DELAY_QUEUE.add(new Order("石锅拌饭订单"));
            } catch (Exception e) {

            }

        });

    }

    public static class Order implements Delayed {

        private final LocalDateTime expireTime;
        private final String orderId;

        public Order(String orderId) {
            this.expireTime = LocalDateTime.now().plusSeconds(30);
            this.orderId = orderId;
            System.out.println(orderId + " 创建于:" + FORMATTER.format(LocalDateTime.now()));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
        }

        @Override
        public int compareTo(Delayed targetOrder) {
            // 谁的过期时间最早谁就排最前面
            return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
        }

        public String getOrderId() {
            return orderId;
        }

        public LocalDateTime getExpireTime() {
            return expireTime;
        }
    }
}

输出:

黄焖鸡订单 创建于:2021-08-21 18:26:30
麻辣香锅订单 创建于:2021-08-21 18:26:35
石锅拌饭订单 创建于:2021-08-21 18:26:45
黄焖鸡订单 超时关闭与:2021-08-21 18:27:00
麻辣香锅订单 超时关闭与:2021-08-21 18:27:05
石锅拌饭订单 超时关闭与:2021-08-21 18:27:15

DelayQueue实现方式小结

这种方式的优点就是实现简单,不复杂,但是其缺点也比较多:不具备可扩展性内存限制无持久化机制,数据容易丢失。


分布式实现

2. 数据库轮询

数据库论询的方式相对而言也比较好理解,后台启动定时任务每隔一段时间扫描指定的数据库表每一行数据,获取出到达指定延迟时间的行进行处理,所以使用该方式重要的就三个要素:

1)捞取任务
扫描数据库的后台任务,可以使用分布式任务去扫,比如A任务扫描limit 0,100满足条件的数据行,B任务扫描limit 100,200满足条件的数据行

2)执行任务
一般来说讲究分工协作,第一步中的分布式线程任务专门用来捞取任务,那么捞取到的任务可以再次扔给另外专门用户处理任务的线程中

3)数据库表设计
可以在表中增加一个字段来表示延迟时间,比如针对上面的订单超时30秒关闭,我们可以增加一个字段timeout,可以是此时间的毫秒数来记录订单的超时时间,那么此时我们的SQL就可以是:

select * from order where ${now} >= timeout limit ${start},100;

数据库轮询实现方式小结

采用这种方式可以看到首先我们需要查询数据库,那么查询数据库就意味着存在查询耗时,那么可能最终导致的就是实时性不高,但是它的优点在于天生满足任务持久化机制,不用担心延迟任务丢失。

3.通过Redis实现

Redis的五大数据类型中的zset数据类型中,包含一个称为score的属性,该数据类型中所有元素都会按照score进行排序,所以如果将score作为我们的延迟时间的时间戳,那么我们可以通过命令Zrangebyscore来获取满足条件的数据,然后交给我们的任务处理线程去处理,其实整个实现思想和数据库轮循是一样的,只不过数据存储结构由数据库转变成了redis,准确来说redis也是数据库,只不过不同的存储结构带来的影响就是适用场景的不同罢了。

那么如果通过Redis来实现延迟队列,大概会有如下几步:

1) 增加任务

zadd tasks ${过期时间戳} ${任务相关数据}

2)捞取任务

ZRANGEBYSCORE tasks -inf ${当前时间戳} WITHSCORES

捞取过期时间早于当前时间的这部分任务

3)执行任务 接下来就是执行,这个就没什么好说的了

关于redis zset数据结构以及命令可以看这里:
https://www.runoob.com/redis/redis-sorted-sets.html

一些优化点

1.在添加延迟任务时,可以通过对任务id进行hash分散至多个redis key,可以避免所有任务存储在一个key中导致大key从而影响元素的添加和查找性能

2.每个key独自拥有一个线程处理

3.每个key的线程只负责拉取需要处理的数据,然后再转发至消息队列中,不做任何其他处理,可以提升处理速度,消息消费者可扩展性好,性能不够,机器来凑

redis实现方式小结

redis因为其都是内存中操作,所以查询插入速度和mysql来比都是非常快的,所以实时性会比mysql高,虽然redis也能满足任务数据的持久化,但是无法保证任务不丢失,所以这里持久性会比mysql稍弱一点


4.使用消息队列

我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能.

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  1. 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  2. 消息因为设置了TTL而过期。
  3. 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

不同实现方式的对比

实现方式

复杂度

数据量

持久化,数据丢失

扩展性

实时性

jdk DelayQueue

简单

由于程序内存限制,适用于少数据量

无持久化

mysql 轮询

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

由于查询开销,稍弱

redis zet

稍微复杂

可支持大数据量

可尽量保证持久化,不保证任务不丢失

可扩展

RabbitMQ

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

结语

除了以上实现方式,还有其他比如通过Rabbit MQTTL死信队列来实现:每一个消息带有TTL属性,该TTL即延迟任务的延迟时间,只要超过指定时间没被消费,此消息将被转至死信队列中,我们可以监听死信队列消费消息进而达到延迟任务的目的;还有时间轮转算法等,时间有限,日后再学,日后再讲。

最近发表
标签列表