




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
第基于Golang實(shí)現(xiàn)延遲隊(duì)列(DelayQueue)目錄背景原理堆隨機(jī)刪除重置元素到期時間Golang實(shí)現(xiàn)數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)原理添加元素阻塞獲取元素Channel方式阻塞讀取性能測試總結(jié)
背景
延遲隊(duì)列是一種特殊的隊(duì)列,元素入隊(duì)時需要指定到期時間(或延遲時間),從隊(duì)頭出隊(duì)的元素必須是已經(jīng)到期的,而且最先到期的元素最先出隊(duì),也就是隊(duì)列里面的元素是按照到期時間排序的,添加元素和從隊(duì)頭出隊(duì)的時間復(fù)雜度是O(log(n))。
由于以上性質(zhì),延遲隊(duì)列一般可以用于以下場景(定時任務(wù)、延遲任務(wù)):
緩存:用戶淘汰過期元素通知:在指定時間通知用戶,比如會議開始前30分鐘訂單:30分鐘未支付取消訂單超時:服務(wù)器自動斷開太長時間沒有心跳的連接
其實(shí)在Golang中是自帶定時器的,也就是time.After()、time.AfterFunc()等函數(shù),它們的性能也是非常好的,隨著Golang版本升級還會優(yōu)化。但是對于某些場景來說確實(shí)不夠方便,比如緩存場景我們需要能夠支持隨機(jī)刪除定時器,隨機(jī)重置過期時間,更加靈活的刪除一小批過期元素。而且像Kafka的時間輪算法(TimeWheel)里面也用到了延遲隊(duì)列,因此還是有必要了解下如何實(shí)現(xiàn)延遲隊(duì)列。
原理
堆
延遲隊(duì)列每次出隊(duì)的是最小到期時間的元素,而堆就是用來獲取最值的數(shù)據(jù)結(jié)構(gòu)。使用堆我們可以實(shí)現(xiàn)O(log(n))時間復(fù)雜度添加元素和移除最小到期時間元素。
隨機(jī)刪除
有時候延遲隊(duì)列還需要具有隨機(jī)刪除元素的能力,可以通過以下方式實(shí)現(xiàn):
元素添加刪除標(biāo)記字段:堆中每個元素都添加一個刪除標(biāo)記字段,并把這個元素的地址返回給用戶,用戶就可以標(biāo)記元素的這個字段為true,這樣元素到達(dá)堆頂時如果判斷到這個字段為true就會被清除,而延遲隊(duì)列里的元素邏輯上是一定會到達(dá)堆頂?shù)模ㄒ驗(yàn)闀r間會流逝)。這是一種懶刪除的方式。元素添加堆中下標(biāo)字段(或用map記錄下標(biāo)):堆中每個元素都添加一個堆中下標(biāo)字段,并把這個元素的地址返回給用戶,這樣我們就可以通過這個元素里面記錄的下標(biāo)快速定位元素在堆中的位置,從而刪除元素。詳細(xì)可以看文章如何實(shí)現(xiàn)一個支持O(log(n))隨機(jī)刪除元素的堆。
重置元素到期時間
如果需要重置延遲隊(duì)列里面元素的到期時間,則必須知道元素在堆中的下標(biāo),因?yàn)橹刂玫狡跁r間之后必須對堆進(jìn)行調(diào)整,因此只能是元素添加堆中下標(biāo)字段。
Golang實(shí)現(xiàn)
這里我們實(shí)現(xiàn)一個最簡單的延遲隊(duì)列,也就是不支持隨機(jī)刪除元素和重置元素的到期時間,因?yàn)橛行﹫鼍爸恍枰砑釉睾瞳@取到期元素這兩個功能,比如Kafka中的時間輪,而且這種簡單實(shí)現(xiàn)性能會高一點(diǎn)。
代碼地址
數(shù)據(jù)結(jié)構(gòu)
主要的結(jié)構(gòu)可以看到就是一個heap,Entry是每個元素在堆中的表示,Value是具體的元素值,Expired是為了堆中元素根據(jù)到期時間排序。
mutex是一個互斥鎖,主要是保證操作并發(fā)安全。
wakeup是一個緩沖區(qū)長度為1的通道,通過它實(shí)現(xiàn)添加元素的時候喚醒等待隊(duì)列不為空或者有更小到期時間元素加入的協(xié)程。(重點(diǎn))
typeEntry[Tany]struct{
ValueT
Expiredtime.Time//到期時間
//延遲隊(duì)列
typeDelayQueue[Tany]struct{
h*heap.Heap[*Entry[T]]
mutexsync.Mutex//保證并發(fā)安全
wakeupchanstruct{}//喚醒通道
//創(chuàng)建延遲隊(duì)列
funcNew[Tany]()*DelayQueue[T]{
returnDelayQueue[T]{
h:heap.New(nil,func(e1,e2*Entry[T])bool{
returne1.Expired.Before(e2.Expired)
wakeup:make(chanstruct{},1),
}
實(shí)現(xiàn)原理
阻塞獲取元素的時候如果隊(duì)列已經(jīng)沒有元素,或者沒有元素到期,那么協(xié)程就需要掛起等待。而被喚醒的條件是元素到期、隊(duì)列不為空或者有更小到期時間元素加入。
其中元素到期協(xié)程在阻塞獲取元素時發(fā)現(xiàn)堆頂元素還沒到期,因此這個條件可以自己構(gòu)造并等待。但是條件隊(duì)列不為空和有更小到期時間元素加入則需要另外一個協(xié)程在添加元素時才能滿足,因此必須通過一個中間結(jié)構(gòu)來進(jìn)行協(xié)程間通信,一般Golang里面會使用Channel來實(shí)現(xiàn)。
添加元素
一開始加了一個互斥鎖,避免并發(fā)沖突,然后把元素加到堆里。
因?yàn)槲覀僒ake()操作,既阻塞獲取元素操作,在不滿足條件時會去等待wakeup通道,但是等待通道前必須釋放鎖,否則Push()無法寫入新元素去滿足條件隊(duì)列不為空和有更小到期時間元素加入。而從釋放鎖后到開始讀取wakeup通道這段時間是沒有鎖保護(hù)的,如果Push()在這期間插入新元素,為了保證通道不阻塞同時又能通知到Take()協(xié)程,我們的通道的長度需要是1,同時使用select+default保證在通道里面已經(jīng)有元素的時候不阻塞Push()協(xié)程。
//添加延遲元素到隊(duì)列
func(q*DelayQueue[T])Push(valueT,delaytime.Duration){
q.mutex.Lock()
deferq.mutex.Unlock()
entry:=Entry[T]{
Value:value,
Expired:time.Now().Add(delay),
q.h.Push(entry)
//喚醒等待的協(xié)程
//這里表示新添加的元素到期時間是最早的,或者原來隊(duì)列為空
//因此必須喚醒等待的協(xié)程,因?yàn)榭梢阅玫礁绲狡诘脑?/p>
ifq.h.Peek()==entry{
select{
caseq.wakeup-struct{}{}:
default:
}
阻塞獲取元素
這里先判斷堆是否有元素,如果有獲取堆頂元素,然后判斷是否已經(jīng)到期,如果到期則直接出堆并返回。否則等待直到超時或者元素到期或者有新的元素到達(dá)。
這里在解鎖之前會清空wakeup通道,這樣可以保證下面讀取的wakeup通道里的元素肯定是新加入的。
//等待直到有元素到期
//或者ctx被關(guān)閉
func(q*DelayQueue[T])Take(ctxcontext.Context)(T,bool){
for{
varexpired*time.Timer
q.mutex.Lock()
//有元素
if!q.h.Empty(){
//獲取元素
entry:=q.h.Peek()
iftime.Now().After(entry.Expired){
q.h.Pop()
q.mutex.Unlock()
returnentry.Value,true
//到期時間,使用time.NewTimer()才能夠調(diào)用Stop(),從而釋放定時器
expired=time.NewTimer(time.Until(entry.Expired))
//避免被之前的元素假喚醒
select{
case-q.wakeup:
default:
q.mutex.Unlock()
//不為空,需要同時等待元素到期
//并且除非expired到期,否則都需要關(guān)閉expired避免泄露
ifexpired!=nil{
select{
case-q.wakeup://新的更快到期元素
expired.Stop()
case-expired.C://首元素到期
case-ctx.Done()://被關(guān)閉
expired.Stop()
vartT
returnt,false
}else{
select{
case-q.wakeup://新的更快到期元素
case-ctx.Done()://被關(guān)閉
vartT
returnt,false
}
Channel方式阻塞讀取
Golang里面可以使用Channel進(jìn)行流式消費(fèi),因此簡單包裝一個Channel形式的阻塞讀取接口,給通道一點(diǎn)緩沖區(qū)大小可以帶來更好的性能。
//返回一個通道,輸出到期元素
//size是通道緩存大小
func(q*DelayQueue[T])Channel(ctxcontext.Context,sizeint)-chanT{
out:=make(chanT,size)
gofunc(){
for{
entry,ok:=q.Take(ctx)
if!ok{
return
out-entry
returnout
}
使用方式
forentry:=rangeq.Channel(context.Background(),10){
//dosomething
}
性能測試
這里進(jìn)行一個簡單的性能測試,也就是先添加元素,然后等待到期后全部拿出來。
funcBenchmarkPushAndTake(b*testing.B){
q:=New[int]()
b.ResetTimer()
//添加元素
fori:=0;ib.N;i++{
q.Push(i,time.Duration(i))
//等待全部元素到期
b.StopTimer()
time.Sleep(time.Duration(b.N))
b.StartTimer()
//獲取元素
fori:=0;ib.N;i++{
_,ok:=q.Take(context.Background())
if!ok{
b.Errorf("want%v,but%v",true,ok)
}
測試結(jié)果:
Benchmark-82331534476.8ns/op
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025電子廠勞務(wù)合同模板
- 2025屆山東省高三下學(xué)期學(xué)業(yè)水平等級性模擬考試歷史試題(原卷版+解析版)
- 公司研發(fā)員工勞動協(xié)議書
- 數(shù)字化平臺服務(wù)推廣合同
- 商業(yè)地產(chǎn)租賃與經(jīng)營管理服務(wù)合同
- 浙江國企招聘2025臺州市黃巖交通旅游投資集團(tuán)有限公司下屬子公司招聘10人筆試參考題庫附帶答案詳解
- 2025重慶銅生人力資源服務(wù)股份有限公司招聘39人筆試參考題庫附帶答案詳解
- 2025山東日照力誠人力資源有限公司招聘外包服務(wù)人員6人筆試參考題庫附帶答案詳解
- 綠茶鑒定 測試題及答案
- 農(nóng)業(yè)科技協(xié)同攻關(guān)實(shí)施方案升級
- 供應(yīng)商質(zhì)量事故索賠單
- PLC智能排號系統(tǒng)
- 基于負(fù)荷模型分析的電力系統(tǒng)電壓穩(wěn)定性研究的開題報(bào)告
- 申請修繕道觀的報(bào)告模板
- 給水處理廠凈水構(gòu)筑物設(shè)計(jì)計(jì)算示例
- (全冊完整16份)北師大版五年級下冊100道口算題大全
- 2022中國幽門螺桿菌感染治療指南
- 鳴人(中英文版)
- 內(nèi)蒙古鄂爾多斯達(dá)拉特經(jīng)濟(jì)開發(fā)區(qū)(達(dá)拉特產(chǎn)業(yè)園區(qū)塊)區(qū)域性地震安全性評價報(bào)告
- 中西文化鑒賞智慧樹知到答案章節(jié)測試2023年鄭州大學(xué)
- 出租房屋安全檢查記錄
評論
0/150
提交評論