




版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
RocketMQ源碼分析之主從數(shù)據(jù)復(fù)制
前言
在RocketMQ主從架構(gòu)中master和slave之間會(huì)進(jìn)行數(shù)據(jù)同步,其中數(shù)據(jù)同步
包括元數(shù)據(jù)復(fù)制和commitlog復(fù)制,那么為什么同步的數(shù)據(jù)中不包括
consumequeue和indexFile呢?這里大家可以思考下:master節(jié)點(diǎn)上
consumequeue和indexFile是根據(jù)commitlog構(gòu)建的,所以slave在同步完
commitlog后只需要根據(jù)commitlog構(gòu)建consumequeue和indexFile即可。本
篇文章就來(lái)分析下master和slave之間是如何進(jìn)行數(shù)據(jù)同步?
一、元數(shù)據(jù)復(fù)制
1.元數(shù)據(jù)復(fù)制入口在RocketMQ中的主從架構(gòu)中,在啟動(dòng)slave節(jié)點(diǎn)的過(guò)程中
會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),該定時(shí)任務(wù)的功能是從master節(jié)點(diǎn)獲取元數(shù)據(jù)。具體如
下:
privatevoidhandleSlaveSynchronize(BrokerRclerole)
if(role==BrokerRole.SLAVE){
if(null!=slaveSyncFuture)
slaveSyncFuture.cancel(false);
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture=this.scheduledExecutorSei
卜ice.scheduleAtFixedRate(newRunnable()
?Override
publicvoidrun()
try
BrokerController.this
.slaveSynchronize.syncAll();
catch(Throwablee)
log,error("Scheduled]
askSlaveSynchronizesyncAllerror.,f,e);
|},1000*3,1000*10,TimeUnit.MILLl|
}else
if(null!=slaveSyncFuture)
slaveSyncFuture.cancel(false);
this.slaveSynchronize.setMasterAddr(null);
)ublicvoidsyncAll()
this.syncTopicConfigO
this.syncConsumerOffset()
this.syncDelayOf^ZTTTTT^^^^^^^I
this.syncSubscriptionGroupConfig();
2.元數(shù)據(jù)復(fù)制都包含哪些內(nèi)容?
從syncAHO方法可以看出元數(shù)據(jù)復(fù)制主要包含以下文件:
(1)topics.json:topic配置文件
(2)consumerOffset.json:consumer消費(fèi)進(jìn)度文件
(3)delayOffset.json:延遲消息拉取進(jìn)度
(4)subscriptionGroup.json:consumerGroup酉己置文件
3.元數(shù)據(jù)同步流程
在RocketMQ中四種元數(shù)據(jù)文件同步的流程是一樣的,這里以topics,json為例
來(lái)分析其流程。從上面syncAll()方法可知:topic配置文件的同步入口是
syncTopicConfigO方法,具體如下:
|privatevoidsyncTopicConfigO
|StringmasterAddrBak=this,master
|if(masterAddrBak!=null&&!masterAddrBak,equa|
|ls(brokerController.getBrokerAddr()))
I
|erOuterAPI().getAllTopicConf
|if(!this.brokerController.getTopic||
|onfigManager().getDataVersion()
|,equals(topicWrapper.getDataV|
^ConfigManager().getDataVersion()
I
|apper.getDataVersion());
this.brokerController.getTopi
|this.brokerController.getTopi|
|.putAll(topicWrapper.|
|this.brokerController.getTopi|
|cConfigManager().persist();
log.error(wSyncTopicConfigExceptior
首先slave會(huì)通過(guò)getAlITopicConfig方法以同步調(diào)用的方式向master發(fā)送
RequestCode.GET_ALL_TOPIC_CONFIG請(qǐng)求來(lái)獲取topic配置文件信息。
ublicTopicConfigSerializeWrappergetAlITopicConfig(
finalStringaddr)throwsRemotingConnectExceptio
RemotingSendRequestException,
RemotingTimeoutException,InterruptedException,MQB|
rokerException{
RemotingCommandrequest=RemotingConimand.createRe
questCommand(RequestCode.GET_ALL_TOPIC_CONFIG,null);
RemotingCommandresponse=this.remotingClient.inv
okeSync(MixAll.brokerVIPChannel(true,addr),request,3000);
assertresponse!=null;
switch(response.getCode()){
caseResponseCode.SUCCESS:
returnTopicConfigSerializeWrapper.d
ecode(response.getBody(),TopicConfigSerializeWrapper.class);
default:
break;
thrownewMQBrokerException(response.getCode(),re
ponse.getRemark(),addr);
master在收到slave端的請(qǐng)求后會(huì)在AdminBrokerProcessor中進(jìn)行處理,具
體是調(diào)用getAllTopicConfig方法來(lái)處理,其處理過(guò)程就是將master端的
topicConfigTable和dataVersion編碼成json字符串并返回給slave。
[privateRemotingCommandgetAllTopicConfig(ChannelHandlerContextct|
[x,RemotingCommandrequest)
|finalRemotingCommandresponse=RemotingCommand.|
|createResponseCommand(GetAllTopicConfigResponseHeader.class)
|Stringcontent=this.brokerController.getTopicCon|
|figManager().encode();
(content!=&&content,length()0)
try
response.setBody(content.getBytes(Mi
All.DEFAULT_CHARSET));
}catch(UnsupportedEncodingExceptione)
log.error(MH,e);
response.setCode(ResponseCode.SYSTEM
|response.setRemark("UnsupportedEncodi
|ngException“+e);
__________________________________returriresponse;
}else
|log,error("Notopicinthisbroker,clie|
|nt:{}",ctx.channel().remoteAddress());
|response.setRemark("Notopicinthisbrok|
('I?il!':i-「)5!、”、:
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null)
returnresponse;
slave在收到master返回的數(shù)據(jù)后會(huì)先判斷本地的dateVersion與master返
回的是否一樣,如果不一樣則會(huì)進(jìn)行以下操作:
(1)更新slave的dataVersion
(2)清空slave端的topicConfigTable并將master返回的數(shù)據(jù)寫(xiě)入
(3)將topic配置進(jìn)行持久化
最后用下圖來(lái)總結(jié)下整個(gè)流程:
RocketMQ其余的元數(shù)據(jù)同步過(guò)程與上圖一樣只是發(fā)送的請(qǐng)求類型不一樣,在閱
讀源碼時(shí)我有注意到一個(gè)問(wèn)題:在同步topic配置文件時(shí)采用的是VIP通道
(即連接的是master的10909端口),而在同步其余三種元數(shù)據(jù)時(shí)采用的是
10911端口,那么問(wèn)題就是其余三種元數(shù)據(jù)在同步時(shí)為什么采用的是10911而
不是10909?我在GitHub上開(kāi)了一個(gè)issue,如果大家有興趣可以一起討論。
這里我認(rèn)為所有的元數(shù)據(jù)同步應(yīng)該都使用10909端口,所以在GitHub提了一個(gè)
pr來(lái)修復(fù)該問(wèn)題。
二、commitlog復(fù)制
commitlog復(fù)制相關(guān)服務(wù)是如何被啟動(dòng)的呢?broker在啟動(dòng)過(guò)程中會(huì)啟動(dòng)
DefaultMessageStore,在啟動(dòng)DefaultMessageStore的過(guò)程中會(huì)判斷broker
是否啟用了Dledger,如果沒(méi)有啟動(dòng)則會(huì)啟動(dòng)HAService,具體如下:
)ublicvoidstart()throwsException
lock=lockFile.getChannel(),tryLock(0,1,false)
f(lock==null||lock.isSharedO|!lock.isValidO){
thrownewRuntimeException(MLockfailed,MQalreadystarted");
lockFile.getChannel().write(ByteBuffer.wrap("lock”.getBytes()));
longmaxPhysicalPosInLogicQueue=commitLog.getMinOffset()
for(ConcurrentMapVInteger,ConsumeQueue>maps:this.consumeQueue|
Table,values
for(ConsumeQueuelogic:maps,values。)
if(logic.getMaxPhysicOffset()>maxPhysicalPosInLogicQueue){|
maxPhysicalPosInLogicQueue=logic.getMaxPhysicOffset();
if(maxPhysicalPosInLogicQueue<0){|
maxPhysicaIPos1nLogicQueue=();H|
if(!messageStoreConfig.isEnableDLegerCommitLog()){
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerR
)le());
this,flushConsumeQueueService.start();
this.storeStatsService.start();
this.addScheduleTask();
this,shutdown=false;
master和slave之間commitlog復(fù)制的整個(gè)過(guò)程如下:
1.啟動(dòng)master并監(jiān)聽(tīng)slave的連接
2.啟動(dòng)slave,建立與master的連接
3.slave向master發(fā)送待拉取數(shù)據(jù)的物理偏移量
4.master根據(jù)待拉取數(shù)據(jù)的物理偏移量打包數(shù)據(jù)并發(fā)給slave
5.slave同步master發(fā)送的數(shù)據(jù)并喚醒reputMessageService服務(wù)構(gòu)建
consumequeue和indexFile
下面詳細(xì)分析master與slave的交互及commitlog同步過(guò)程
1.啟動(dòng)master并監(jiān)聽(tīng)slave連接
master的啟動(dòng)過(guò)程如下,這里與master相關(guān)的是acceptSocketService和
groupTransferService,其中g(shù)roupTransferService與commitlog的同步復(fù)制
相關(guān),后面會(huì)詳細(xì)說(shuō)明。acceptSocketService主要負(fù)責(zé)master端監(jiān)聽(tīng)slave
連接。
[publicvoidstart()throwsException
|this.acceptSocketService.beginAccept();
|this.acceptSocketService.s+qr+C
//groupTransferService與commitlog同步
this.groupTransferService.start();
this.haClient.start();
publicvoidbeginAccept()throwsException
this.serverSocketChannel=ServerSocketChannel.open()
this,selector二RemotingUtil.openSelector()
this.serverSocketChannel.socket(),setReuseAddress(true)
this.serverSocketChannel.socket().bind(this,socketAddressListen
B
this.serverSocketChannel.register(this.selector,SelectionKey.Of
(2)acceptSocketService.start()該函數(shù)的具體流程如下:
publicvoidrun()
10g.info(this.getServiceName()+“servicestarted");
while(!this.isStopped。)
try{
Set<SelectionKey>selected=this,selector.selectedKeys();
|for(SelectionKeyk:selected)
|Socketchannelsc=((ServerSocketChannel)k.channel()).a|
|HAService.log.info("HAServicereceivenewconnection,
|+sc.socket().getRemoteSocketAddress());
HAConnectionconn=newHAConnection(HAService.this,
corm,start();
HAService.this.addConnection(conn)
log,error("newHAConnectionexception”,e);
sc.close();
log.warn("Unexpectedopsinselect"+k.readyOps());
selected,clear();
log.error(this.getServiceNameO+“servicehasexception.n,e)
log.info(this.getServiceName()+“serviceend");
上面過(guò)程中有個(gè)重要的過(guò)程就是啟動(dòng)了HAConnection,HAConnection表示的是
master與slave之間的網(wǎng)絡(luò)連接,在HAConnection中有兩個(gè)重要的對(duì)象分別
是readSocketService和writeSocketService,其中readSocketService是
master讀取實(shí)現(xiàn)類,writeSocketService是master寫(xiě)實(shí)現(xiàn)類,具體如下:
rnblicvoidstart()
this.readSocketService.start();
this.writeSocketService.start();
2.啟動(dòng)slave,建立與master連接及向master發(fā)送待拉取數(shù)據(jù)的物理偏移量
slave啟動(dòng)過(guò)程主要啟動(dòng)的是HAClient,具體如下:
[publievoidrun()
|log,info(this.getServiceName()+"servicestarted")
|if(this.connectMaster())
|booleanresult=this,reportSlaveMaxOffset(this.currentRep|
this,selector,select(1000)
booleanok=this.processReadEvent();
:?J.(【)"'」心"「()
MMMMW
[10nginterval
|HAService.this.getDefaultMessageStore(),getSystemClock()[
|-this.
|if(interval〉HAService.this.getDefaultMessageStore(),getMe|
bsageStoreConfig()
|.getHaHousekeepinglnterval
|log,warn("HAC1ient,housekeeping,foundthisconnection],+|
this.masterAddress
+”]expired,"+interval);
this.closeMaster();
log.warn(nHAClient,masternotresponsesometime,soclose
connection");
}else{
this.waitForRunning(1000*5);
}catch(Exceptione){
log.warn(this.getServiceName()+“servicehasexception.”,e);
this,waitForRunning(1000*5);
log,info(this.getServiceName()+“serviceend");
這里需要詳細(xì)看幾個(gè)比較重要的點(diǎn):
(1)slave連接master完成slave連接master的函數(shù)是connectMaster,該
函數(shù)主要完成以下操作:
?獲取master的地址并連接,這里需要注意在開(kāi)始時(shí)將通道設(shè)置為阻塞
的,在connect完成后又將連接通道設(shè)置為非阻塞的
[publicstaticSocketChannelconnect(SocketAddressremote,finalinttimeo|
|SocketCharmelsc二mil1;
|sc二SocketCharmel.open();
|sc.configureBlocking(true);
|sc.socket().setSoLinger(fa卜a-1)
|sc.socket(),setTcpNoDelay(true);
|sc.socket().connect(remote,timeoutMillis)
Isc.configureBlocking(false);
|returnsc;
|}catch(Exceptione)
|sc.close();
|}catch([OExceptionel)
returnnull;
?注冊(cè)O(shè)P_READ事件
?獲取slave端commitlog的最大物理偏移量并緩存在
currentReportedOffset
?更新lastWriteTimestamp,lastWriteTimestamp的作用是用來(lái)計(jì)算
master和slave之間同步的時(shí)間間隔
privatebooleanconnectMaster()throwsClosedChannelException
|if(null■二二socketChannel)
|Stringaddr二this.masterAddress.get();
|SocketAddresssocketAddress=RemotingLti1.string2SocketAddre|
|ss(addr);
|if(socketAddress!=null)
|二;..、()(-k(—;udR(——〔iLL)nn(、(「〔(s()(「kc——s);■
|if(this,socketchannel!=null)
|this.socketChannel.register(this,selector,SelectionKey.0|
pREAD);
|this.currentReportedOffset=HAService.this.defaultMessageStore|
this,lastWriteTimestamp=System,current!imeMiHis();
returnthis,socketchannel!=null;
(2)isTimeToReportOffset()isTimeToReportOffset()方法的作用是判斷是
否向master匯報(bào)slave端commitlog的最大物理偏移量,判斷依據(jù)是計(jì)算當(dāng)前
時(shí)間與lastWriteTimestamp的時(shí)間間隔,如果該時(shí)間間隔大于
haSendHeartbeatInterval(默認(rèn)是5秒,可以在配置文件中進(jìn)行修改)
)rivatebooleanisTimeToReportOffset(){
longinterval二
HAService.this.defaultMessageStore.getSystemClockQ.now()一thi
.lastWriteTimestamp;
booleanneedHeart=interval>HAService.this.defaultMessageStore.
IgetMessageStoreConfig()
|,getHaSendHeartbeatInterval();
(3)reportSlaveMaxOffset(this.currentReportedOffset)
reportSlaveMaxOffset方法的作用是向master匯報(bào)slave端commitlog最大
物理偏移量,也就是將currentReportedOffset匯報(bào)給master,這里可以發(fā)現(xiàn)
是將currentReportedOffset存儲(chǔ)在reportOffset中(reportOffset是8個(gè)
字節(jié)),然后將reportOffset發(fā)送給master,這里在向master發(fā)送時(shí)有個(gè)循
環(huán),猜測(cè)該循環(huán)與通道是非阻塞性質(zhì)有關(guān),加上循環(huán)可以確保reportOffset發(fā)
送完成,發(fā)送完成后修改lastWriteTimestamp,最后如果reportOffset沒(méi)有
空余則返回true
xrivatebooleanreportSlaveMaxOffset(finallongmaxOffset)
this.reportOffset.position(0);
this.reportOffset.],益
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.1,??;十
r(inti=0;i<3&&this.reportOffset.hasRemainingQ;i++)
try
this.socketChannel.write(this.reportOffset);
}catch(lOExceptione){
log,error(this.getServiceName()
+nreportSlaveMaxOffsetthis.socketChannel.writeexception11,
e);
returnfalse;
lastWriteTimestamp=HAService.this.defaultMessageStore.getSyste|
|mClock().now();
return!this.reportOffset.hasRemainingO;
(4)reportSlaveMaxOffsetPlus()reportSlaveMaxOffsetPlus方法的作用是
判斷slave端的當(dāng)前commitlog的最大物理偏移量是否有增長(zhǎng),如果有則更新
reportSlaveMaxOffset并調(diào)用reportSlaveMaxOffset向master匯報(bào)
rivatebooleanreportSlaveMaxOffsetPlus()
booleanresult=true;
|10ngcurrentPhyOffset=HAService.this.defaultMessageStore.getMax|
if(currentPhyOffset>this.currentReportedOffset)
result=this.reportSlaveMaxOffset(this.currentReportedOffset);
if(!result)
this.closeMaster();
log.error("HAClient,reportSlaveMaxOffseterror,"+this,curr
entReportedOffset);
returnresult;
3.master根據(jù)待拉取數(shù)據(jù)的物理偏移量打包數(shù)據(jù)并發(fā)給slave
3.1master讀取數(shù)據(jù)
從上面可知master讀取slave發(fā)送數(shù)據(jù)是由HAConnection的
readSocketService對(duì)象的processReadEvent方法完成的,具體如下:
(1)首先判斷byteBufferRead是否還有剩余空間,byteBufferRead的作用是
存儲(chǔ)master讀取的數(shù)據(jù),如果沒(méi)有空間則對(duì)byteBufferRead進(jìn)行flip操作,
同時(shí)將processPosition重置為0,processPosition表示處理
byteBufferRead的位置
(2)讀取通道中的數(shù)據(jù)并存儲(chǔ)到byteBufferRead
(3)判斷byteBufferRead的position與processPosition之間的差值是否大
于等于8,之所以進(jìn)行判斷是因?yàn)閟lave向master匯報(bào)commitlog的最大物理
偏移量占了8個(gè)字節(jié),所以如果大于等于8則表示有一個(gè)完整的數(shù)據(jù)可以進(jìn)行
處理。在條件滿足的情況下會(huì)進(jìn)行以下操作:
?計(jì)算出byteBufferRead距離當(dāng)前位置最近的位置,具體見(jiàn)下圖:假設(shè)下
圖中每個(gè)單元格代表8個(gè)字節(jié),position是(24,32)之間的任意一個(gè)位
置,現(xiàn)在要計(jì)算的是距離當(dāng)前位置最近的一個(gè)完整的數(shù)據(jù)的結(jié)束位置,
具體算法是:this.byteBufferRead.position()-
(this.byteBufferRead.position()%8),計(jì)算出結(jié)束的位置后用當(dāng)前
位置減去8就是開(kāi)始的位置
?讀?。踦os-8,pos]之間的數(shù)據(jù)并存儲(chǔ)在readOffset
?將processPosition移動(dòng)到pos位置
(4)將上面讀取到的待拉取數(shù)據(jù)的物理偏移量存儲(chǔ)在slaveAckOffset(5)
判斷slaveRequestOffset是否小于0,如果小于0則更新為本次slave待拉取
數(shù)據(jù)的物理偏移量(slaveAckOffset存儲(chǔ)的是slave端已經(jīng)拉取完成的物理偏
移量,slaveRequestOffset存儲(chǔ)的是slave端請(qǐng)求拉取的數(shù)據(jù)的物理偏移量)
rivatebooleanprocessReadEvent()
if(!this.byteBufferRead.hasRemainingQ)
,
?P!,fl,'pssin>i;i川-:0:
|while(this.byteBufferRead.hasRemainingO)
|intreadSize=this.socketChannel.read(this.byteBufferRead);.
|readSizeZeroT;二c?
|this.lastReadTimestamp=HAConnection.this.haService.getDef
|aultMessageStore().getSystemClock().now();
|if((this.byteBufferRead.position。-this.processPosition)
|intpos二this.byteBufferRead.position()一(this.byteBuffei
longreadOffset=this.byteBufferRead.getLong(pos-8);
this.processPosition=pos;
|HAConnection.this.slaveAckOffset二readOffset
|if(HAConnection.this.slaveRequestOffset<0)
|HAConnection.this.slaveRequestOffset=readOffset
Ilog.info(,,slave[>t+HAConnection.this.clientAddr+w]req|
|HAConnection.this.haService.notifyTransferSome(HAConnect
if(++readSizeZeroTimes>=3)
log,error("readsocket1"+HAConnection.this.clientAddr+”]
returnfalse;
}catch(lOExceptione){
10g.error("processReadEventexception",e);
returnfalse;
returntrue;
3.2master向slave寫(xiě)數(shù)據(jù)
接著來(lái)看下master讀取完待拉取數(shù)據(jù)后的操作,這里就到
writeSocketService了,其主要完成的master向slave寫(xiě)數(shù)據(jù)的功能。其實(shí)
現(xiàn)都在run方法中,具體如下:
(1)首先判斷slaveRequestOffset是否等于T,如果等于-1則表示master
還沒(méi)有收到slave端待拉取數(shù)據(jù)的請(qǐng)求。這里需要注意slaveRequestOffset是
master在收到slave端匯報(bào)的待拉取數(shù)據(jù)物理偏移量是更新的,即
readSocketService的processReadEvent方法中。
(2)判斷nextTransferFromWhere是否等于T,nextTransferFromWhere表示
master下次向slave同步數(shù)據(jù)的物理偏移量,如果它的值為T則表示是第一次
進(jìn)行數(shù)據(jù)同步。如果slaveRequestOffset等于0則從當(dāng)前commitlog中最后一
個(gè)文件開(kāi)始進(jìn)行數(shù)據(jù)傳輸。如果slaveRequestOffset不等于0則
nextTransferFromWhere被賦值為slaveRequestOffset,即從slave請(qǐng)求的位
置開(kāi)始傳輸數(shù)據(jù)。
(3)lastWriteOver表示的是上次數(shù)據(jù)傳輸是否完成,如果上次數(shù)據(jù)傳輸已經(jīng)
完成并且當(dāng)前距離上次最后寫(xiě)入時(shí)間的間隔大于haSendHeartbeatInterval
(默認(rèn)是5秒可在配置文件中進(jìn)行配置)則會(huì)向slave發(fā)送一個(gè)12字節(jié)的數(shù)據(jù)
包,其中前8個(gè)字節(jié)用來(lái)存儲(chǔ)nextTransferFromWhere,后4個(gè)字節(jié)存儲(chǔ)的值
為0。如果上次數(shù)據(jù)傳輸沒(méi)有完成則會(huì)繼續(xù)傳輸數(shù)據(jù)然后再判斷是否傳輸完
成,如果還是沒(méi)有傳輸完成則結(jié)束本次事件處理,等到下次事件處理時(shí)繼續(xù)傳
輸上次沒(méi)有傳輸完成的數(shù)據(jù)。
(4)如果上次數(shù)據(jù)已經(jīng)傳輸完成,則會(huì)根據(jù)nextTransferFromWhere獲取該偏
移量之后所有的數(shù)據(jù),如果在該偏移量之后沒(méi)有數(shù)據(jù)則會(huì)等待100毫秒,如果
數(shù)據(jù)不為空,首先會(huì)判斷數(shù)據(jù)的長(zhǎng)度是否大于haTransferBatchSize(默認(rèn)是
32KB)從這里可以看出slave很有可能會(huì)收到master傳輸?shù)牟煌暾南?。?/p>
著會(huì)用變量thisOffset記錄本次數(shù)據(jù)傳輸開(kāi)始的偏移量,然后更新
nextTransferFromWhere,并對(duì)selectResult的ByteBuffer進(jìn)行l(wèi)imit操作限
制本次傳輸數(shù)據(jù)的大小,最后將selectResult賦給
selectMappedBufferResulto在進(jìn)行數(shù)據(jù)傳輸前會(huì)先在byteBufferHeader中記
錄本次數(shù)據(jù)傳輸?shù)拈_(kāi)始位置thisOffset和傳輸?shù)臄?shù)據(jù)大小。
(5)調(diào)用transferDataO方法進(jìn)行數(shù)據(jù)傳輸,在進(jìn)行數(shù)據(jù)傳輸時(shí)會(huì)先傳輸
byteBufferHeader,然后傳輸selectMappedBufferResulto數(shù)據(jù)傳輸完成后會(huì)
對(duì)selectMappedBufferResult釋放。
這里我們稍微總結(jié)下,master向slave傳輸?shù)臄?shù)據(jù)包實(shí)際上分為兩種:
?不包含消息的數(shù)據(jù)包這類數(shù)據(jù)包共12個(gè)字節(jié),其中前8個(gè)字節(jié)用來(lái)存
儲(chǔ)master向slave同步數(shù)據(jù)的起始偏移量,后4個(gè)字節(jié)存儲(chǔ)的是消息的
長(zhǎng)度,這里存儲(chǔ)的值為0
?包含消息的數(shù)據(jù)包這類數(shù)據(jù)包共分為三個(gè)部分,其中前8個(gè)字節(jié)用來(lái)存
儲(chǔ)本次向slave傳輸數(shù)據(jù)的起始偏移量,后四個(gè)字節(jié)用來(lái)存儲(chǔ)本次傳輸
的消息長(zhǎng)度,最后一個(gè)部分占用size字節(jié)表示的是消息
publicvoidrun(){
HAConnection.log.info(this,getServiceName()+"servicestarted11)
while(!this.isStoppedO)
this.selector,select(1000);
if(-1==HAConnection.this.slaveRequestOffset)
nue;
|if(-1■二二this.nextTransferFromWhere)
|if(0=HAConnection.this.slaveRequestOffset)
|longmasterOffset=HAConnection.this.haService.getDefault|
MessageStore().getCommitLogO.getMaxOffset();
|masterOffset
|master0ffse
|一(masterOffset%HAConnection.this.haService.getDefau|
[tMessageStore().getMessageStoreConfig()^^^^^^^^^^^^^^^^^^^^[
|.getMappedFileSizeCommitLogO);
1
~"""""」.7;「4);N"
this.nextTransferFromWhere=masterOffset;
this.nextTransferFromWhere=HAConnection.this.slaveReque|
stOffset;
|log,info("mastertransferdatafrom"+this.nextTransferFrom|
/here+"toslave]"+HAConnection.this.cl"n+八乂小
+“],andslaverequest"+HAConnection.this.slaveRequestOf
fset);
|longinterval
|HAConnection.this.haService.getDefaultMessageStore(),get]
|if(interval>HAConnection.this.haService.getDefaultMessage|
[Store().getMessageStoreConf
|.getHaSendHeartbeatInterval())
|this.byteBufferHeader.position(0);
.bylel?un,('rll('<ici(?r.IimiI(hc^idrrSize):
this.byteBufferHeader.putLong(this.nextTransferFromWhere
B
this.byteBufferHeader.putlnt(O)
this.lastWriteOver=this.transferDataQ;
nue;
this.lastWriteOver二this.transferDataQ;
inue;
|SelectMappedBufferResultselectResult=
|HAConnection.this.haService.getDefaultMessageStoreQtgetCo]
^nmitLogData(this.nextTransferFromWhere);
|intsize二selectResult.getSize()
|if(size>HAConnection.this.haService.getDefaultMessageStor|
,().getMessageStoreConfig().getllaTransferBatchSize())第
size二HAConnection.this.haService.getDefaultMessageStore|
().getMessageStoreConfig().getHaTransferBatchSize();
longthisOffset二this.nextTransferFromWhere;^^^^^^^^J
this.nextTransferFromWhere+=
selectResult.getByteBuffer().limit仁;”)
this.selectMappedBufferResult=selectResult;^^^^^^^^^J
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset)
this.byteBufferHeader.putlnt(size);
this.lastWriteOver二this.transferDataQ;
HAConnection.this.haService.getWaitNotifyObject(),allWaitF
}catch(Exception。){|
HAConnection.log.error(this.getServiceName()+“servicehase
xception.n,e);
break;
HAConnection.this.haService.getWaitNotifyObject().removeFromWai
tingThreadTable();
if(this.selectMappedBufferResult!=null)
this.selectMappedBufferResult.release。;
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);
SelectionKeysk二this.socketChannel.keyFor(this,selector);
if(sk!=null)
sk.cancel();
this,selector.close();
this.socketChannel.close();
}catch(lOExceptione)
HAConnection.10g.error,'",e);
HAConnection.log.info(this^getServiceName()+"serviceend");
4.slave讀取master發(fā)送的數(shù)據(jù)包
slave處理讀事件的方法是HAClient中的processReadEvent方法,
byteBufferRead是slave端的讀緩沖區(qū)。processReadEvent方法的邏輯是首先
判斷byteBufferRead是否還有剩余空間,如果還有剩余空間則將channel中的
數(shù)據(jù)讀取到byteBufferRead中,然后調(diào)用dispatchReadRequest方法處理讀取
的數(shù)據(jù)。
?rivatebooleanprocessReadEvent(){
intreadSizeZeroTimes二0;
while(this.byteBufferRead.hasRemainingO)
intreadSize=this,socketchannel,read(this.byteBufferRead);
readSizeZeroTimes=0;
booleanresult二this.dispatchReadRequest()
10g.error("HAClient,dispatchReadRequesterror")
returnfalse;
if(++readSizeZeroTimes>=3)
log.info("HAClient,processReadEventreadsocket<0");
}catch(lOExceptione)
log,info("HAClient,processReadEventreadsocketexception”,(
B
returnfalse;
returntrue;
這里需要重點(diǎn)分析dispatchReadRequest方法:
(1)用readSocketPos記錄byteBufferRead當(dāng)前的position
(2)計(jì)算byteBufferRead中當(dāng)前position與dispatchPosition
(dispatchPosition指向byteBufferRead中已經(jīng)處理的位置的指針)差值
diff
(3)判斷diff是否大于等于12字節(jié),這里之所有判斷與12字節(jié)的關(guān)系是因
為master向slave發(fā)送的數(shù)據(jù)包前12字節(jié)包含了傳輸?shù)臄?shù)據(jù)的起始物理偏移
量以及傳輸?shù)臄?shù)據(jù)的長(zhǎng)度。如果條件成立則從byteBufferRead中分別讀取傳輸
的數(shù)據(jù)的起始物理偏移量和傳輸?shù)臄?shù)據(jù)的長(zhǎng)度并記錄在masterPhyOffset和
bodySize中。
(4)從當(dāng)前節(jié)點(diǎn)獲取commitlog的最大物理偏移量slavePhyOffset,并判斷
slavePhyOffset與masterPhyOffset是否相等,正常情況下兩者應(yīng)該是相等
的,如果不相等則輸出error信息。
(5)判斷diff是否大于等于msgHeaderSize+bodySize,如果條件成立表示
當(dāng)前有一個(gè)完整的數(shù)據(jù)包,然后執(zhí)行以下操作:
?將byteBufferRead的position設(shè)置為dispatchPosition+
msgHeaderSize,也就是數(shù)據(jù)包中數(shù)據(jù)開(kāi)始的位置
?讀取數(shù)據(jù)包中的消息并存儲(chǔ)在bodyData中
?調(diào)用appendToCommitLog方法完成數(shù)據(jù)追加操作
?將byteBufferRead的position重新設(shè)置到readSocketPos
?將dispatchPosition向前移動(dòng)msgHeaderSize+bodySize
?調(diào)用reportSlaveMaxOffsetPlus方法判斷slave端commitlog的是否有
追加,如果有新的增加則向master匯報(bào)currentReportedOffset
privatebooleandispatchReadRequest()
finalintmsgHeaderSize=8+4;”,
intreadSocketPos二this.byteBufferRead.position。
while(true)
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 專業(yè)c語(yǔ)言期末考試題及答案
- 上海保安考試題目及答案
- 電動(dòng)汽車車輛維修合同3篇
- 突發(fā)公共衛(wèi)生事件應(yīng)對(duì)與管理
- 南通市崇川區(qū)2023-2024四年級(jí)數(shù)學(xué)下冊(cè)期末試卷及答案
- 呼吸管理運(yùn)營(yíng)體系構(gòu)建
- 幼兒園衛(wèi)生保健家長(zhǎng)座談會(huì)
- 建筑工程施工總承包合同范文4篇
- T/ZJFIA 011-2023常山雙柚汁復(fù)合果汁飲料
- 汽車創(chuàng)意美術(shù)課件設(shè)計(jì)
- 頂層鋼結(jié)構(gòu)合同
- 中國(guó)硬筆書(shū)法等級(jí)考試試卷(三級(jí))
- 2025年江蘇省啟東市文化廣電和旅游局招聘編外1人歷年高頻重點(diǎn)提升(共500題)附帶答案詳解
- 《普通生物學(xué)》課程期末考試復(fù)習(xí)題庫(kù)及答案
- dlt-5161-2018電氣裝置安裝工程質(zhì)量檢驗(yàn)及評(píng)定規(guī)程
- 用戶生命周期管理策略-洞察分析
- 第三屆中國(guó)長(zhǎng)三角地區(qū)融資擔(dān)保職業(yè)技能競(jìng)賽選拔賽試題庫(kù)500題(含答案)
- 2025屆安徽省A10聯(lián)盟高三第二次調(diào)研數(shù)學(xué)試卷含解析
- 項(xiàng)目管理與工程經(jīng)濟(jì)決策知到智慧樹(shù)章節(jié)測(cè)試課后答案2024年秋哈爾濱工程大學(xué)
- 【MOOC】生命的教育-浙江大學(xué) 中國(guó)大學(xué)慕課MOOC答案
- 2024年中英城市更新白皮書(shū)
評(píng)論
0/150
提交評(píng)論