




版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第Redisson分布式延時(shí)隊(duì)列RedissonDelayedQueue運(yùn)行流程目錄前言基本使用內(nèi)部數(shù)據(jù)結(jié)構(gòu)介紹基本流程發(fā)送延時(shí)消息獲取延時(shí)消息初始化延時(shí)隊(duì)列總結(jié)
前言
因?yàn)楣ぷ髦行枰玫椒植际降难訒r(shí)隊(duì)列,調(diào)研了一段時(shí)間,選擇使用RedissonDelayedQueue,為了搞清楚內(nèi)部運(yùn)行流程,特記錄下來(lái)。
總體流程大概是圖中的這個(gè)樣子,初看一眼有點(diǎn)不知從何下手,接下來(lái)我會(huì)通過(guò)以下幾點(diǎn)來(lái)分析流程,相信看完本文你能了解整個(gè)運(yùn)行流程。
基本使用內(nèi)部數(shù)據(jù)結(jié)構(gòu)介紹基本流程發(fā)送延時(shí)消息獲取延時(shí)消息初始化延時(shí)隊(duì)列
基本使用
發(fā)送延遲消息代碼如下,發(fā)送了一條延遲時(shí)間為5s的消息。
publicvoidproduce(){
Stringqueuename="delay-queue";
RBlockingQueueStringblockingQueue=redissonClient.getBlockingQueue(queuename);
RDelayedQueueStringdelayedQueue=redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer("測(cè)試延遲消息",5,TimeUnit.SECONDS);
}
接收消息代碼如下,可以看到delayedQueue是沒(méi)有用到的,那么為什么要加這一行呢,這個(gè)后面總結(jié)部分回答。
publicvoidconsume()throwsInterruptedException{
Stringqueuename="delay-queue";
RBlockingQueueStringblockingQueue=redissonClient.getBlockingQueue(queuename);
RDelayedQueueStringdelayedQueue=redissonClient.getDelayedQueue(blockingQueue);
Stringmsg=blockingQueue.take();
//收到消息進(jìn)行處理...
}
這兩段代碼可以寫在兩個(gè)不同的Java工程里,只要連接的是同一個(gè)Redis就行。
調(diào)用comsume()之后,如果隊(duì)列里沒(méi)有消息,會(huì)阻塞等待隊(duì)列里有消息并且取到了才會(huì)返回。之所以這么說(shuō)是因?yàn)榭赡苡袆e的Java進(jìn)程也在跟你一樣取同一個(gè)隊(duì)列里的消息,如果消息被另一個(gè)搶完了,那這時(shí)就還得阻塞等待。
這時(shí)看上去的原理是這樣的:
生產(chǎn)者調(diào)用offer()后,自己內(nèi)部開啟一個(gè)定時(shí)器,等到了時(shí)間在發(fā)送到redis的list里。
如果是這樣設(shè)計(jì)的話,相信大家都能看出來(lái)一個(gè)很簡(jiǎn)單的問(wèn)題,要是延時(shí)時(shí)間還沒(méi)到,生產(chǎn)者自己掛了,那樣消息就丟了。所以,還是讓我們接著往下看。
內(nèi)部數(shù)據(jù)結(jié)構(gòu)介紹
redisson源碼里一共創(chuàng)建了三個(gè)隊(duì)列:【消息延時(shí)隊(duì)列】、【消息順序隊(duì)列】、【消息目標(biāo)隊(duì)列】。
假設(shè)在同一時(shí)間按照msg1、msg2、msg3的順序發(fā)消息到延時(shí)隊(duì)列,這三條消息就會(huì)被保存在【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】。
可以看到【消息延時(shí)隊(duì)列】的順序是按照到期時(shí)間升序排列的,而不是像【消息順序隊(duì)列】按照插入順序排。
消息到期后會(huì)將消息從前兩個(gè)隊(duì)列移除(怎么移?誰(shuí)來(lái)移?),插入【消息目標(biāo)隊(duì)列】,也就是圖中第三個(gè)隊(duì)列。
消費(fèi)者也是阻塞在【消息目標(biāo)隊(duì)列】上取消息。
這時(shí)可以簡(jiǎn)單說(shuō)明下每個(gè)隊(duì)列的作用:
【消息延時(shí)隊(duì)列】利用按照到期時(shí)間排序的特性,可以很快找到下一個(gè)要到期的消息,客戶端內(nèi)部自己定時(shí)到【消息目標(biāo)隊(duì)列】取【消息順序隊(duì)列】這個(gè)隊(duì)列對(duì)分析的流程關(guān)聯(lián)不大,可以忽略【消息目標(biāo)隊(duì)列】存放到期的消息,供消費(fèi)端取
其實(shí)【消息延時(shí)隊(duì)列】隊(duì)列里存的時(shí)間(也就是zet的score)是到期的時(shí)間戳,為了畫圖方便,圖里就畫的是延遲的時(shí)間,不過(guò)不影響理解。
理解好這幾個(gè)隊(duì)列的名字和作用,后面還會(huì)一直用到,如果忘了可以翻回來(lái)回顧下。
因?yàn)闀鴮懤斫夥奖愫汀鞠㈨樞蜿?duì)列】在本文沒(méi)涉及到,后面部分好幾次提到的內(nèi)容:把到期的消息從【消息延時(shí)隊(duì)列】移到【消息目標(biāo)隊(duì)列】里,這句話實(shí)際的代碼邏輯是這樣:把【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】里的到期消息移除,把它們插入到【消息目標(biāo)隊(duì)列】。
基本流程
知道了內(nèi)部所使用到的數(shù)據(jù)結(jié)構(gòu)后,這里可以簡(jiǎn)單說(shuō)下整體的基本流程。
先說(shuō)發(fā)送延遲消息,發(fā)送的延遲消息會(huì)先存在【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】,如果【消息延時(shí)隊(duì)列】原本是空的,會(huì)發(fā)布訂閱信息提醒有新的消息。
獲取延遲消息只需要從【消息目標(biāo)隊(duì)列】阻塞的取就行了,因?yàn)槔锩娑际堑狡跀?shù)據(jù)。
那么問(wèn)題就只剩下怎么樣判斷時(shí)間到了,把【消息延時(shí)隊(duì)列】里的消息移動(dòng)到【消息目標(biāo)隊(duì)列】里呢?
這部分工作交給了初始化延時(shí)隊(duì)列來(lái)處理。
這里面會(huì)定時(shí)從【消息延時(shí)隊(duì)列】查詢最新到期時(shí)間,定時(shí)去把【消息延時(shí)隊(duì)列】里的消息移動(dòng)到【消息目標(biāo)隊(duì)列】里。
如果【消息延時(shí)隊(duì)列】是空的,就不會(huì)再定時(shí)查,而是等待發(fā)布訂閱信息提醒,再定時(shí)把【消息延時(shí)隊(duì)列】里的消息移動(dòng)到【消息目標(biāo)隊(duì)列】里。
剛開始看可能有點(diǎn)抽象,可以看完底下一節(jié)內(nèi)容之后,再回頭來(lái)看這里對(duì)應(yīng)的流程總結(jié),可能會(huì)比較清晰。
發(fā)送延時(shí)消息
發(fā)送延時(shí)消息的邏輯比較簡(jiǎn)單,先看下發(fā)送的代碼。
publicvoidproduce(){
Stringqueuename="delay-queue";
RBlockingQueueStringblockingQueue=redissonClient.getBlockingQueue(queuename);
RDelayedQueueStringdelayedQueue=redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer("測(cè)試延遲消息",5,TimeUnit.SECONDS);
}
從delayedQueue.offer方法開始,最終會(huì)執(zhí)行到RedissonDelayedQueue的offerAsync方法里。
offerAsync方法的作用就是發(fā)送一段腳本給redis執(zhí)行,腳本內(nèi)容是:
將消息和到期時(shí)間插入【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】如果最近到期的消息是剛剛插入的消息,則對(duì)指定主題發(fā)布到期時(shí)間,目的是為了讓客戶端定時(shí)去把【消息延時(shí)隊(duì)列】里的到期數(shù)據(jù)移動(dòng)到【消息目標(biāo)隊(duì)列】
@Override
publicRFutureVoidofferAsync(Ve,longdelay,TimeUnittimeUnit){
if(delay0){
thrownewIllegalArgumentException("Delaycan'tbenegative");
longdelayInMs=timeUnit.toMillis(delay);
longtimeout=System.currentTimeMillis()+delayInMs;
longrandomId=ThreadLocalRandom.current().nextLong();
returncommandExecutor.evalWriteNoRetryAsync(getRawName(),codec,RedisCommands.EVAL_VOID,
"localvalue=struct.pack('dLc0',tonumber(ARGV[2]),string.len(ARGV[3]),ARGV[3]);"
+"redis.call('zadd',KEYS[2],ARGV[1],value);"
+"redis.call('rpush',KEYS[3],value);"
//ifnewobjectaddedtoqueueheadwhenpublishitsstartTime
//toallschedulerworkers
+"localv=redis.call('zrange',KEYS[2],0,0);"
+"ifv[1]==valuethen"
+"redis.call('publish',KEYS[4],ARGV[1]);"
+"end;",
Arrays.ObjectasList(getRawName(),timeoutSetName,queueName,channelName),
timeout,randomId,encode(e));
}
獲取延時(shí)消息
獲取延時(shí)消息是本文最簡(jiǎn)單的一部分。
publicvoidconsume()throwsInterruptedException{
Stringqueuename="delay-queue";
RBlockingQueueStringblockingQueue=redissonClient.getBlockingQueue(queuename);
RDelayedQueueStringdelayedQueue=redissonClient.getDelayedQueue(blockingQueue);
Stringmsg=blockingQueue.take();
//收到消息進(jìn)行處理...
}
blockingQueue.take()方法其實(shí)只是對(duì)【消息目標(biāo)隊(duì)列】執(zhí)行blpop阻塞的獲取到期消息
初始化延時(shí)隊(duì)列
看一下初始化的代碼。
publicvoidinit(){
Stringqueuename="delay-queue";
RBlockingQueueStringblockingQueue=redissonClient.getBlockingQueue(queuename);
RDelayedQueueStringdelayedQueue=redissonClient.getDelayedQueue(blockingQueue);
}
入口就是在redissonClient.getDelayedQueue(blockingQueue)中,創(chuàng)建了RedissonDelayedQueue對(duì)象,并執(zhí)行了構(gòu)造方法里的邏輯。
那么這里面主要做了什么事呢?
主要是調(diào)用了QueueTransferTask的start()方法。
publicvoidstart(){
RTopicschedulerTopic=getTopic();
statusListenerId=schedulerTopic.addListener(newBaseStatusListener(){
@Override
publicvoidonSubscribe(Stringchannel){
pushTask();
messageListenerId=schedulerTopic.addListener(Long.class,newMessageListenerLong(){
@Override
publicvoidonMessage(CharSequencechannel,LongstartTime){
scheduleTask(startTime);
}
這段代碼主要是設(shè)置了指定主題(主題名:redisson_delay_queue_channel:{queuename})兩個(gè)發(fā)布訂閱的監(jiān)聽器。
當(dāng)指定主題有新訂閱時(shí)調(diào)用pushTask()方法,里面又會(huì)調(diào)用pushTaskAsync()方法當(dāng)指定主題有新消息時(shí)調(diào)用scheduleTask(startTime)方法
需要注意的是,這里會(huì)先訂閱指定主題,然后觸發(fā)執(zhí)行onSubscribe()方法。
所以我們主要搞懂這三個(gè)方法都是做什么的,那么整個(gè)初始化流程就明白了。
因?yàn)檫@三個(gè)方法是相互調(diào)用的,只看文字的話容易云里霧里,這里有個(gè)流程圖,看方法解釋文字的時(shí)候可以對(duì)照著流程圖看比較有印象。
scheduleTask()
這個(gè)方法看起來(lái)多,但核心內(nèi)容就是根據(jù)方法參數(shù)指定的時(shí)間調(diào)用pushTask()。
privatevoidscheduleTask(finalLongstartTime){
TimeoutTaskoldTimeout=lastTimeout.get();
if(startTime==null){
return;
if(oldTimeout!=null){
oldTimeout.getTask().cancel();
longdelay=startTime-System.currentTimeMillis();
if(delay10){
Timeouttimeout=connectionManager.newTimeout(newTimerTask(){
@Override
publicvoidrun(Timeouttimeout)throwsException{
pushTask();
TimeoutTaskcurrentTimeout=lastTimeout.get();
if(currentTimeout.getTask()==timeout){
lastTpareAndSet(currentTimeout,null);
},delay,TimeUnit.MILLISECONDS);
if(!lastTpareAndSet(oldTimeout,newTimeoutTask(startTime,timeout))){
timeout.cancel();
}else{
pushTask();
}
pushTaskAsync()
這個(gè)方法是抽象方法,在創(chuàng)建RedissonDelayedQueue對(duì)象的時(shí)候傳進(jìn)來(lái)的,代碼如下:
@Override
protectedRFutureLongpushTaskAsync(){
returncommandExecutor.evalWriteAsync(getRawName(),LongCodec.INSTANCE,RedisCommands.EVAL_LONG,
"localexpiredValues=redis.call('zrangebyscore',KEYS[2],0,ARGV[1],'limit',0,ARGV[2]);"
+"if#expiredValues0then"
+"fori,vinipairs(expiredValues)do"
+"localrandomId,value=struct.unpack('dLc0',v);"
+"redis.call('rpush',KEYS[1],value);"
+"redis.call('lrem',KEYS[3],1,v);"
+"end;"
+"redis.call('zrem',KEYS[2],unpack(expiredValues));"
+"end;"
//getstartTimefromschedulerqueueheadtask
+"localv=redis.call('zrange',KEYS[2],0,0,'WITHSCORES');"
+"ifv[1]~=nilthen"
+"returnv[2];"
+"end"
+"returnnil;",
Arrays.ObjectasList(getRawName(),timeoutSetName,queueName),
System.currentTimeMillis(),100);
}
看不懂也不要緊,聽我解釋下就明白了。
這里發(fā)送了一段腳本給redis執(zhí)行:
從【消息延時(shí)隊(duì)列】取出前一百條到期的消息,如果有的話,添加到【消息目標(biāo)隊(duì)列】里,并將這些消息從【消息延時(shí)隊(duì)列】和【消息順序隊(duì)列】中移除從【消息延時(shí)隊(duì)列】取出下一條要到期的消息,返回它的到期時(shí)間戳(如果隊(duì)列里沒(méi)消息返回空)。
我的理解就是初始化的時(shí)候
1是為了處理舊的消息,比如生產(chǎn)者1發(fā)送了消息,然后時(shí)間沒(méi)到自己下線了,這時(shí)如果沒(méi)有其他客戶端在線,就沒(méi)有人能把數(shù)據(jù)從【消息目標(biāo)隊(duì)列】移到【消息目標(biāo)隊(duì)列】了。
2是返回的這個(gè)時(shí)間戳,會(huì)拿這個(gè)定時(shí),等時(shí)間到了去【消息目標(biāo)隊(duì)列】拉去到期的消息。
簡(jiǎn)單總結(jié)就是這個(gè)方法是把到期消息從【消息延時(shí)隊(duì)列】放到【消息目標(biāo)隊(duì)列】里,并且返回了最近要到期消息的時(shí)間戳。
pushTask()
privatevoidpushTask(){
RFutureLongstartTimeFuture=pushTaskAsync();
startTimeFuture.whenComplete((res,e)-{
if(e!=null){
if(einstanceofRedissonShutdownException){
return;
log.error(e.getMessage(),e);
scheduleTask(System.currentTimeMillis()+5*1000L);
return;
if(res!=null){
scheduleTask(res);
}
這個(gè)代碼看起來(lái)就比較簡(jiǎn)單,調(diào)用了pushTaskAsync()獲取最近要到期消息的時(shí)間戳(異步封裝了一下)。
有異常的話就調(diào)用scheduleTask()五秒后再執(zhí)行一次pushTask()。
沒(méi)有異常的話如果有最近要到期消息的時(shí)間戳(說(shuō)明【消息延時(shí)隊(duì)列】里還有未到期消息),用這個(gè)最新到期時(shí)間調(diào)用scheduleTask(),在這個(gè)指定的時(shí)間調(diào)用pushTask()。
這個(gè)方法簡(jiǎn)單總結(jié)就是決定了要不要調(diào)用、什么時(shí)候再調(diào)用pushTask(),主要操作邏輯都在pushTaskAsync()里(把到期的消息從【消息延時(shí)隊(duì)列】移到【消息目標(biāo)隊(duì)列】供消費(fèi)端消費(fèi))。
了解了上面幾個(gè)方法的流程和含義,還記得一開頭提到的添加了兩個(gè)發(fā)布訂閱的監(jiān)聽器嗎?
1.當(dāng)指定主題有新訂閱時(shí)調(diào)用pushTask()方法,里面又會(huì)調(diào)用pushTaskAsync()方法
2.當(dāng)指定主題有新消息時(shí)調(diào)用scheduleTask(startTime)方法
需要注意的是,這里會(huì)先訂閱指定主題,然后觸發(fā)執(zhí)行onSubscribe()方法
在初始化延時(shí)隊(duì)列剛啟動(dòng)的時(shí)候,處理到期舊數(shù)據(jù):把到期的消息從【消息延時(shí)隊(duì)列】移到【消息目標(biāo)隊(duì)列】供消費(fèi)端消費(fèi);處理新數(shù)據(jù):獲取下次到期時(shí)間決定下次調(diào)用pushTask()的時(shí)間。
上面講的這種情況是站在當(dāng)前客戶端的視角,但畢竟這是監(jiān)聽訂閱信息,如果啟動(dòng)不止一個(gè)客戶端的話(就算是1個(gè)生產(chǎn)者1個(gè)消費(fèi)者,也算兩個(gè)客戶端),總有一個(gè)客戶端的訂閱信息回調(diào)函數(shù),會(huì)不會(huì)有問(wèn)題?
仔細(xì)想想是沒(méi)有的,處理到期舊數(shù)據(jù):之前啟動(dòng)的客戶端已經(jīng)處理完了;處理新數(shù)據(jù):獲取最近到期時(shí)間,在scheduleTask()里,如果之前有正在定時(shí)的任務(wù),會(huì)把原來(lái)正在定時(shí)的任務(wù)取消掉。這個(gè)被取消的任務(wù),時(shí)間要么就是當(dāng)前這個(gè)時(shí)間,要嘛是之后的時(shí)間,取消掉不會(huì)影響邏輯。
為了應(yīng)對(duì)原本【消息延時(shí)隊(duì)列】里沒(méi)消息了這種情況,流程結(jié)束了,重啟定時(shí)去調(diào)用pushTask(),把到期的消息從【消息延時(shí)隊(duì)列】移到【消息目標(biāo)隊(duì)列】供消費(fèi)端消費(fèi)。
總結(jié)
再放一下開頭的圖總體流程圖:
1.初始化延時(shí)隊(duì)列時(shí)會(huì)把【消息延時(shí)隊(duì)列】里的到期數(shù)據(jù)移動(dòng)到【消息目標(biāo)隊(duì)列】,沒(méi)有也有可能;然后是找最近要到期的消息時(shí)間,定時(shí)去拉,這個(gè)剛啟動(dòng)也是可能沒(méi)有的,不過(guò)不要緊,這兩步是為了處理滯留在【消息延時(shí)隊(duì)列】的舊數(shù)據(jù)(
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 行政管理學(xué)分析方法試題及答案
- 2025年自考行政管理課程評(píng)價(jià)試題及答案
- 行政管理實(shí)施的現(xiàn)實(shí)案例分析試題及答案
- 2025年建筑工程考試策略與試題及答案
- 行政管理中的員工關(guān)系試題及答案
- 行政管理與可持續(xù)發(fā)展試題及答案路徑
- pc磚銷售合同范例
- 行政管理市政學(xué)知識(shí)關(guān)鍵信息試題及答案
- 行政管理中的可持續(xù)發(fā)展研究試題及答案
- 2025年建筑考試模擬試題及答案分析
- 新時(shí)代好教師標(biāo)準(zhǔn)PPT爭(zhēng)做“四有”“四個(gè)引路人”和“四個(gè)相統(tǒng)一”好老師PPT課件(帶內(nèi)容)
- 朊毒體、不明原因感染手術(shù)手術(shù)間處理流程
- 第五講高密電法
- 羅沙司他治療腎性貧血中國(guó)專家共識(shí)
- 西安市國(guó)有土地上房屋征收評(píng)估辦法
- 初中畢業(yè)證書怎么查詢電子版
- 煙臺(tái)某公寓電氣設(shè)計(jì)畢業(yè)論文
- GB/T 3780.21-2016炭黑第21部分:篩余物的測(cè)定水沖洗法
- GB/T 26251-2010氟和氟氮混合氣
- 汽車運(yùn)用與維修技術(shù)畢業(yè)論文
- 儲(chǔ)煤場(chǎng)管理制度(6篇)
評(píng)論
0/150
提交評(píng)論