优秀的编程知识分享平台

网站首页 > 技术文章 正文

聊聊redisson的DelayedQueue(redisson jedis lettuce)

nanyue 2024-09-01 00:07:38 技术文章 6 ℃

本文主要研究一下redisson的DelayedQueue

maven

 <dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.8.1</version>
 </dependency>

实例

 @Test
 public void testDelayedQueue() throws InterruptedException {
 Config config = new Config();
 config.useSingleServer()
 .setAddress("redis://192.168.99.100:6379");
 RedissonClient redisson = Redisson.create(config);
 RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");
 RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
 delayedQueue.offer("demo", 10, TimeUnit.SECONDS);
 Assert.assertFalse(blockingQueue.contains("demo"));
 TimeUnit.SECONDS.sleep(15);
 Assert.assertTrue(blockingQueue.contains("demo"));
 }
  • 这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列上,这里就是RBlockingQueue

源码解析

RDelayedQueue.offer

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
 private final QueueTransferService queueTransferService;
 private final String channelName;
 private final String queueName;
 private final String timeoutSetName;
 protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
 super(codec, commandExecutor, name);
 channelName = prefixName("redisson_delay_queue_channel", getName());
 queueName = prefixName("redisson_delay_queue", getName());
 timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
 //QueueTransferTask task = ......
 queueTransferService.schedule(queueName, task);
 this.queueTransferService = queueTransferService;
 }
 public void offer(V e, long delay, TimeUnit timeUnit) {
 get(offerAsync(e, delay, timeUnit));
 }
 public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
 long delayInMs = timeUnit.toMillis(delay);
 long timeout = System.currentTimeMillis() + delayInMs;
 long randomId = PlatformDependent.threadLocalRandom().nextLong();
 return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
 "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
 + "redis.call('zadd', KEYS[2], ARGV[1], value);"
 + "redis.call('rpush', KEYS[3], value);"
 // if new object added to queue head when publish its startTime 
 // to all scheduler workers 
 + "local v = redis.call('zrange', KEYS[2], 0, 0); "
 + "if v[1] == value then "
 + "redis.call('publish', KEYS[4], ARGV[1]); "
 + "end;"
 ,
 Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
 timeout, randomId, encode(e));
 }
 public ByteBuf encode(Object value) {
 if (commandExecutor.isRedissonReferenceSupportEnabled()) {
 RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
 if (reference != null) {
 value = reference;
 }
 }
 try {
 return codec.getValueEncoder().encode(value);
 } catch (IOException e) {
 throw new IllegalArgumentException(e);
 }
 }
 public static String prefixName(String prefix, String name) {
 if (name.contains("{")) {
 return prefix + ":" + name;
 }
 return prefix + ":{" + name + "}";
 }
 //......
}
  • 这里使用的是一段lua脚本,其中keys参数数组有四个值,KEYS[1]为getName(), KEYS[2]为timeoutSetName, KEYS[3]为queueName, KEYS[4]为channelName
  • 变量有三个,ARGV[1]为timeout,ARGV[2]为randomId,ARGV[3]为encode(e)
  • 这段lua脚本对timeoutSetName的zset添加一个结构体,其score为timeout值;对queueName的list的表尾添加结构体;然后判断timeoutSetName的zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息

queueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

 QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
 @Override
 protected RFuture<Long> pushTaskAsync() {
 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
 "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
 + "if #expiredValues > 0 then "
 + "for i, v in ipairs(expiredValues) do "
 + "local randomId, value = struct.unpack('dLc0', v);"
 + "redis.call('rpush', KEYS[1], value);"
 + "redis.call('lrem', KEYS[3], 1, v);"
 + "end; "
 + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
 + "end; "
 // get startTime from scheduler queue head task
 + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
 + "if v[1] ~= nil then "
 + "return v[2]; "
 + "end "
 + "return nil;",
 Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
 System.currentTimeMillis(), 100);
 }
 @Override
 protected RTopic<Long> getTopic() {
 return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);
 }
 };
 queueTransferService.schedule(queueName, task);
  • RedissonDelayedQueue构造器里头对QueueTransferTask进行调度
  • 调度执行的是pushTaskAsync方法,主要就是将到期的元素从元素队列移到目标队列
  • 这里使用一段lua脚本,KEYS[1]为getName(),KEYS[2]为timeoutSetName,KEYS[3]为queueName;ARGV[1]为当前时间戳,ARGV[2]为100
  • 这里调用zrangebyscore,对timeoutSetName的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素,取前200条
  • 如果有值表示该元素需要移交到目标队列,然后调用rpush移交到目标队列,再调用lrem从元素队列移除,最后在从timeoutSetName的zset中删除掉已经处理的这些元素
  • 处理完过元素转移之后,再取timeoutSetName的zset的第一个元素的得分返回,如果没有返回nil

QueueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java

public class QueueTransferService {
 private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
 public synchronized void schedule(String name, QueueTransferTask task) {
 QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
 if (oldTask == null) {
 task.start();
 } else {
 oldTask.incUsage();
 }
 }
 public synchronized void remove(String name) {
 QueueTransferTask task = tasks.get(name);
 if (task != null) {
 if (task.decUsage() == 0) {
 tasks.remove(name, task);
 task.stop();
 }
 }
 }
}
  • 这里的schedule方法首先添加到ConcurrentMap中,如果该任务已经存在,则调用oldTask.incUsage(),不存在则启动该任务

QueueTransferTask.start

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java

 public void start() {
 RTopic<Long> schedulerTopic = getTopic();
 statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
 @Override
 public void onSubscribe(String channel) {
 pushTask();
 }
 });
 messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {
 @Override
 public void onMessage(CharSequence channel, Long startTime) {
 scheduleTask(startTime);
 }
 });
 }
 private void scheduleTask(final Long startTime) {
 TimeoutTask oldTimeout = lastTimeout.get();
 if (startTime == null) {
 return;
 }
 if (oldTimeout != null) {
 oldTimeout.getTask().cancel();
 }
 long delay = startTime - System.currentTimeMillis();
 if (delay > 10) {
 Timeout timeout = connectionManager.newTimeout(new TimerTask() { 
 @Override
 public void run(Timeout timeout) throws Exception {
 pushTask();
 TimeoutTask currentTimeout = lastTimeout.get();
 if (currentTimeout.getTask() == timeout) {
 lastTimeout.compareAndSet(currentTimeout, null);
 }
 }
 }, delay, TimeUnit.MILLISECONDS);
 if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
 timeout.cancel();
 }
 } else {
 pushTask();
 }
 }
 private void pushTask() {
 RFuture<Long> startTimeFuture = pushTaskAsync();
 startTimeFuture.addListener(new FutureListener<Long>() {
 @Override
 public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {
 if (!future.isSuccess()) {
 if (future.cause() instanceof RedissonShutdownException) {
 return;
 }
 log.error(future.cause().getMessage(), future.cause());
 scheduleTask(System.currentTimeMillis() + 5 * 1000L);
 return;
 }
 if (future.getNow() != null) {
 scheduleTask(future.getNow());
 }
 }
 });
 }
  • 这里用到了RTopic,添加了StatusListener以及MessageListener
  • StatusListener在订阅的时候触发pushTask,MessageListener主要是调用scheduleTask
  • pushTaskAsync在RedissonDelayedQueue的实现就是上面讲的实现元素在原始队列及目标队列的转移
  • scheduleTask方法会重新计算delay,对于大于10的延时触发pushTask,小于等于10的则立刻触发pushTask
  • pushTask会对pushTaskAsync操作进行回调,如果执行不成功则重新触发scheduleTask,如果执行成功但是返回值(timeoutSetName的zset的第一个元素的得分)不为null的话,则以该值触发scheduleTask

小结

  • redisson的DelayedQueue使用上是将元素及延时信息入队,之后定时任务将到期的元素转移到目标队列
  • 这里使用了三个结构来存储,一个是目标队列list;一个是原生队列list,添加的是带有延时信息的结构体;一个是timeoutSetName的zset,元素是结构体,其score为timeout值
  • redisson使用了很多异步回调来操作,整体代码阅读上会相对费劲些

doc

  • delayed-queue

Tags:

最近发表
标签列表