聊聊redisson的DelayedQueue
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.1</version> </dependency> 複製程式碼
例項
@Test public void testDelayedQueue() throws InterruptedException { Config config = new Config(); config.useSingleServer() .setAddress("redis://192.168.99.100:6379"); RedissonClient redisson = Redisson.create(config); RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1"); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue); delayedQueue.offer("demo", 10, TimeUnit.SECONDS); Assert.assertFalse(blockingQueue.contains("demo")); TimeUnit.SECONDS.sleep(15); Assert.assertTrue(blockingQueue.contains("demo")); } 複製程式碼
- 這裡使用了兩個queue,對delayedQueue的offer操作是直接進入delayedQueue,但是delay是作用在目標佇列上,這裡就是RBlockingQueue
原始碼解析
RDelayedQueue.offer
redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> { private final QueueTransferService queueTransferService; private final String channelName; private final String queueName; private final String timeoutSetName; protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); channelName = prefixName("redisson_delay_queue_channel", getName()); queueName = prefixName("redisson_delay_queue", getName()); timeoutSetName = prefixName("redisson_delay_queue_timeout", getName()); //QueueTransferTask task = ...... queueTransferService.schedule(queueName, task); this.queueTransferService = queueTransferService; } public void offer(V e, long delay, TimeUnit timeUnit) { get(offerAsync(e, delay, timeUnit)); } public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) { long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = PlatformDependent.threadLocalRandom().nextLong(); return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;" , Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); } public ByteBuf encode(Object value) { if (commandExecutor.isRedissonReferenceSupportEnabled()) { RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); if (reference != null) { value = reference; } } try { return codec.getValueEncoder().encode(value); } catch (IOException e) { throw new IllegalArgumentException(e); } } public static String prefixName(String prefix, String name) { if (name.contains("{")) { return prefix + ":" + name; } return prefix + ":{" + name + "}"; } //...... } 複製程式碼
- 這裡使用的是一段lua指令碼,其中keys引數陣列有四個值,KEYS[1]為getName(), KEYS[2]為timeoutSetName, KEYS[3]為queueName, KEYS[4]為channelName
- 變數有三個,ARGV[1]為timeout,ARGV[2]為randomId,ARGV[3]為encode(e)
- 這段lua指令碼對timeoutSetName的zset新增一個結構體,其score為timeout值;對queueName的list的表尾新增結構體;然後判斷timeoutSetName的zset的第一個元素是否是當前的結構體,如果是則對channel釋出timeout訊息
queueTransferService.schedule
redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @Override protected RFuture<Long> pushTaskAsync() { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.<Object>asList(getName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic<Long> getTopic() { return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName); } }; queueTransferService.schedule(queueName, task); 複製程式碼
- RedissonDelayedQueue構造器裡頭對QueueTransferTask進行排程
- 排程執行的是pushTaskAsync方法,主要就是將到期的元素從元素佇列移到目標佇列
- 這裡使用一段lua指令碼,KEYS[1]為getName(),KEYS[2]為timeoutSetName,KEYS[3]為queueName;ARGV[1]為當前時間戳,ARGV[2]為100
- 這裡呼叫zrangebyscore,對timeoutSetName的zset使用timeout引數進行排序,取得分介於0和當前時間戳的元素,取前200條
- 如果有值表示該元素需要移交到目標佇列,然後呼叫rpush移交到目標佇列,再呼叫lrem從元素佇列移除,最後在從timeoutSetName的zset中刪除掉已經處理的這些元素
- 處理完過元素轉移之後,再取timeoutSetName的zset的第一個元素的得分返回,如果沒有返回nil
QueueTransferService.schedule
redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java
public class QueueTransferService { private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap(); public synchronized void schedule(String name, QueueTransferTask task) { QueueTransferTask oldTask = tasks.putIfAbsent(name, task); if (oldTask == null) { task.start(); } else { oldTask.incUsage(); } } public synchronized void remove(String name) { QueueTransferTask task = tasks.get(name); if (task != null) { if (task.decUsage() == 0) { tasks.remove(name, task); task.stop(); } } } } 複製程式碼
- 這裡的schedule方法首先新增到ConcurrentMap中,如果該任務已經存在,則呼叫oldTask.incUsage(),不存在則啟動該任務
QueueTransferTask.start
redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java
public void start() { RTopic<Long> schedulerTopic = getTopic(); statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } }); } private void scheduleTask(final Long startTime) { TimeoutTask oldTimeout = lastTimeout.get(); if (startTime == null) { return; } if (oldTimeout != null) { oldTimeout.getTask().cancel(); } long delay = startTime - System.currentTimeMillis(); if (delay > 10) { Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { pushTask(); TimeoutTask currentTimeout = lastTimeout.get(); if (currentTimeout.getTask() == timeout) { lastTimeout.compareAndSet(currentTimeout, null); } } }, delay, TimeUnit.MILLISECONDS); if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) { timeout.cancel(); } } else { pushTask(); } } private void pushTask() { RFuture<Long> startTimeFuture = pushTaskAsync(); startTimeFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception { if (!future.isSuccess()) { if (future.cause() instanceof RedissonShutdownException) { return; } log.error(future.cause().getMessage(), future.cause()); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (future.getNow() != null) { scheduleTask(future.getNow()); } } }); } 複製程式碼
timeoutSetName的zset的第一個元素的得分