RocketMQ源碼分析之主從數(shù)據(jù)復(fù)制_第1頁(yè)
RocketMQ源碼分析之主從數(shù)據(jù)復(fù)制_第2頁(yè)
RocketMQ源碼分析之主從數(shù)據(jù)復(fù)制_第3頁(yè)
RocketMQ源碼分析之主從數(shù)據(jù)復(fù)制_第4頁(yè)
RocketMQ源碼分析之主從數(shù)據(jù)復(fù)制_第5頁(yè)
已閱讀5頁(yè),還剩50頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

RocketMQ源碼分析之主從數(shù)據(jù)復(fù)制

前言

在RocketMQ主從架構(gòu)中master和slave之間會(huì)進(jìn)行數(shù)據(jù)同步,其中數(shù)據(jù)同步

包括元數(shù)據(jù)復(fù)制和commitlog復(fù)制,那么為什么同步的數(shù)據(jù)中不包括

consumequeue和indexFile呢?這里大家可以思考下:master節(jié)點(diǎn)上

consumequeue和indexFile是根據(jù)commitlog構(gòu)建的,所以slave在同步完

commitlog后只需要根據(jù)commitlog構(gòu)建consumequeue和indexFile即可。本

篇文章就來(lái)分析下master和slave之間是如何進(jìn)行數(shù)據(jù)同步?

一、元數(shù)據(jù)復(fù)制

1.元數(shù)據(jù)復(fù)制入口在RocketMQ中的主從架構(gòu)中,在啟動(dòng)slave節(jié)點(diǎn)的過(guò)程中

會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù),該定時(shí)任務(wù)的功能是從master節(jié)點(diǎn)獲取元數(shù)據(jù)。具體如

下:

privatevoidhandleSlaveSynchronize(BrokerRclerole)

if(role==BrokerRole.SLAVE){

if(null!=slaveSyncFuture)

slaveSyncFuture.cancel(false);

this.slaveSynchronize.setMasterAddr(null);

slaveSyncFuture=this.scheduledExecutorSei

卜ice.scheduleAtFixedRate(newRunnable()

?Override

publicvoidrun()

try

BrokerController.this

.slaveSynchronize.syncAll();

catch(Throwablee)

log,error("Scheduled]

askSlaveSynchronizesyncAllerror.,f,e);

|},1000*3,1000*10,TimeUnit.MILLl|

}else

if(null!=slaveSyncFuture)

slaveSyncFuture.cancel(false);

this.slaveSynchronize.setMasterAddr(null);

)ublicvoidsyncAll()

this.syncTopicConfigO

this.syncConsumerOffset()

this.syncDelayOf^ZTTTTT^^^^^^^I

this.syncSubscriptionGroupConfig();

2.元數(shù)據(jù)復(fù)制都包含哪些內(nèi)容?

從syncAHO方法可以看出元數(shù)據(jù)復(fù)制主要包含以下文件:

(1)topics.json:topic配置文件

(2)consumerOffset.json:consumer消費(fèi)進(jìn)度文件

(3)delayOffset.json:延遲消息拉取進(jìn)度

(4)subscriptionGroup.json:consumerGroup酉己置文件

3.元數(shù)據(jù)同步流程

在RocketMQ中四種元數(shù)據(jù)文件同步的流程是一樣的,這里以topics,json為例

來(lái)分析其流程。從上面syncAll()方法可知:topic配置文件的同步入口是

syncTopicConfigO方法,具體如下:

|privatevoidsyncTopicConfigO

|StringmasterAddrBak=this,master

|if(masterAddrBak!=null&&!masterAddrBak,equa|

|ls(brokerController.getBrokerAddr()))

I

|erOuterAPI().getAllTopicConf

|if(!this.brokerController.getTopic||

|onfigManager().getDataVersion()

|,equals(topicWrapper.getDataV|

^ConfigManager().getDataVersion()

I

|apper.getDataVersion());

this.brokerController.getTopi

|this.brokerController.getTopi|

|.putAll(topicWrapper.|

|this.brokerController.getTopi|

|cConfigManager().persist();

log.error(wSyncTopicConfigExceptior

首先slave會(huì)通過(guò)getAlITopicConfig方法以同步調(diào)用的方式向master發(fā)送

RequestCode.GET_ALL_TOPIC_CONFIG請(qǐng)求來(lái)獲取topic配置文件信息。

ublicTopicConfigSerializeWrappergetAlITopicConfig(

finalStringaddr)throwsRemotingConnectExceptio

RemotingSendRequestException,

RemotingTimeoutException,InterruptedException,MQB|

rokerException{

RemotingCommandrequest=RemotingConimand.createRe

questCommand(RequestCode.GET_ALL_TOPIC_CONFIG,null);

RemotingCommandresponse=this.remotingClient.inv

okeSync(MixAll.brokerVIPChannel(true,addr),request,3000);

assertresponse!=null;

switch(response.getCode()){

caseResponseCode.SUCCESS:

returnTopicConfigSerializeWrapper.d

ecode(response.getBody(),TopicConfigSerializeWrapper.class);

default:

break;

thrownewMQBrokerException(response.getCode(),re

ponse.getRemark(),addr);

master在收到slave端的請(qǐng)求后會(huì)在AdminBrokerProcessor中進(jìn)行處理,具

體是調(diào)用getAllTopicConfig方法來(lái)處理,其處理過(guò)程就是將master端的

topicConfigTable和dataVersion編碼成json字符串并返回給slave。

[privateRemotingCommandgetAllTopicConfig(ChannelHandlerContextct|

[x,RemotingCommandrequest)

|finalRemotingCommandresponse=RemotingCommand.|

|createResponseCommand(GetAllTopicConfigResponseHeader.class)

|Stringcontent=this.brokerController.getTopicCon|

|figManager().encode();

(content!=&&content,length()0)

try

response.setBody(content.getBytes(Mi

All.DEFAULT_CHARSET));

}catch(UnsupportedEncodingExceptione)

log.error(MH,e);

response.setCode(ResponseCode.SYSTEM

|response.setRemark("UnsupportedEncodi

|ngException“+e);

__________________________________returriresponse;

}else

|log,error("Notopicinthisbroker,clie|

|nt:{}",ctx.channel().remoteAddress());

|response.setRemark("Notopicinthisbrok|

('I?il!':i-「)5!、”、:

response.setCode(ResponseCode.SUCCESS);

response.setRemark(null)

returnresponse;

slave在收到master返回的數(shù)據(jù)后會(huì)先判斷本地的dateVersion與master返

回的是否一樣,如果不一樣則會(huì)進(jìn)行以下操作:

(1)更新slave的dataVersion

(2)清空slave端的topicConfigTable并將master返回的數(shù)據(jù)寫(xiě)入

(3)將topic配置進(jìn)行持久化

最后用下圖來(lái)總結(jié)下整個(gè)流程:

RocketMQ其余的元數(shù)據(jù)同步過(guò)程與上圖一樣只是發(fā)送的請(qǐng)求類型不一樣,在閱

讀源碼時(shí)我有注意到一個(gè)問(wèn)題:在同步topic配置文件時(shí)采用的是VIP通道

(即連接的是master的10909端口),而在同步其余三種元數(shù)據(jù)時(shí)采用的是

10911端口,那么問(wèn)題就是其余三種元數(shù)據(jù)在同步時(shí)為什么采用的是10911而

不是10909?我在GitHub上開(kāi)了一個(gè)issue,如果大家有興趣可以一起討論。

這里我認(rèn)為所有的元數(shù)據(jù)同步應(yīng)該都使用10909端口,所以在GitHub提了一個(gè)

pr來(lái)修復(fù)該問(wèn)題。

二、commitlog復(fù)制

commitlog復(fù)制相關(guān)服務(wù)是如何被啟動(dòng)的呢?broker在啟動(dòng)過(guò)程中會(huì)啟動(dòng)

DefaultMessageStore,在啟動(dòng)DefaultMessageStore的過(guò)程中會(huì)判斷broker

是否啟用了Dledger,如果沒(méi)有啟動(dòng)則會(huì)啟動(dòng)HAService,具體如下:

)ublicvoidstart()throwsException

lock=lockFile.getChannel(),tryLock(0,1,false)

f(lock==null||lock.isSharedO|!lock.isValidO){

thrownewRuntimeException(MLockfailed,MQalreadystarted");

lockFile.getChannel().write(ByteBuffer.wrap("lock”.getBytes()));

longmaxPhysicalPosInLogicQueue=commitLog.getMinOffset()

for(ConcurrentMapVInteger,ConsumeQueue>maps:this.consumeQueue|

Table,values

for(ConsumeQueuelogic:maps,values。)

if(logic.getMaxPhysicOffset()>maxPhysicalPosInLogicQueue){|

maxPhysicalPosInLogicQueue=logic.getMaxPhysicOffset();

if(maxPhysicalPosInLogicQueue<0){|

maxPhysicaIPos1nLogicQueue=();H|

if(!messageStoreConfig.isEnableDLegerCommitLog()){

this.haService.start();

this.handleScheduleMessageService(messageStoreConfig.getBrokerR

)le());

this,flushConsumeQueueService.start();

this.storeStatsService.start();

this.addScheduleTask();

this,shutdown=false;

master和slave之間commitlog復(fù)制的整個(gè)過(guò)程如下:

1.啟動(dòng)master并監(jiān)聽(tīng)slave的連接

2.啟動(dòng)slave,建立與master的連接

3.slave向master發(fā)送待拉取數(shù)據(jù)的物理偏移量

4.master根據(jù)待拉取數(shù)據(jù)的物理偏移量打包數(shù)據(jù)并發(fā)給slave

5.slave同步master發(fā)送的數(shù)據(jù)并喚醒reputMessageService服務(wù)構(gòu)建

consumequeue和indexFile

下面詳細(xì)分析master與slave的交互及commitlog同步過(guò)程

1.啟動(dòng)master并監(jiān)聽(tīng)slave連接

master的啟動(dòng)過(guò)程如下,這里與master相關(guān)的是acceptSocketService和

groupTransferService,其中g(shù)roupTransferService與commitlog的同步復(fù)制

相關(guān),后面會(huì)詳細(xì)說(shuō)明。acceptSocketService主要負(fù)責(zé)master端監(jiān)聽(tīng)slave

連接。

[publicvoidstart()throwsException

|this.acceptSocketService.beginAccept();

|this.acceptSocketService.s+qr+C

//groupTransferService與commitlog同步

this.groupTransferService.start();

this.haClient.start();

publicvoidbeginAccept()throwsException

this.serverSocketChannel=ServerSocketChannel.open()

this,selector二RemotingUtil.openSelector()

this.serverSocketChannel.socket(),setReuseAddress(true)

this.serverSocketChannel.socket().bind(this,socketAddressListen

B

this.serverSocketChannel.register(this.selector,SelectionKey.Of

(2)acceptSocketService.start()該函數(shù)的具體流程如下:

publicvoidrun()

10g.info(this.getServiceName()+“servicestarted");

while(!this.isStopped。)

try{

Set<SelectionKey>selected=this,selector.selectedKeys();

|for(SelectionKeyk:selected)

|Socketchannelsc=((ServerSocketChannel)k.channel()).a|

|HAService.log.info("HAServicereceivenewconnection,

|+sc.socket().getRemoteSocketAddress());

HAConnectionconn=newHAConnection(HAService.this,

corm,start();

HAService.this.addConnection(conn)

log,error("newHAConnectionexception”,e);

sc.close();

log.warn("Unexpectedopsinselect"+k.readyOps());

selected,clear();

log.error(this.getServiceNameO+“servicehasexception.n,e)

log.info(this.getServiceName()+“serviceend");

上面過(guò)程中有個(gè)重要的過(guò)程就是啟動(dòng)了HAConnection,HAConnection表示的是

master與slave之間的網(wǎng)絡(luò)連接,在HAConnection中有兩個(gè)重要的對(duì)象分別

是readSocketService和writeSocketService,其中readSocketService是

master讀取實(shí)現(xiàn)類,writeSocketService是master寫(xiě)實(shí)現(xiàn)類,具體如下:

rnblicvoidstart()

this.readSocketService.start();

this.writeSocketService.start();

2.啟動(dòng)slave,建立與master連接及向master發(fā)送待拉取數(shù)據(jù)的物理偏移量

slave啟動(dòng)過(guò)程主要啟動(dòng)的是HAClient,具體如下:

[publievoidrun()

|log,info(this.getServiceName()+"servicestarted")

|if(this.connectMaster())

|booleanresult=this,reportSlaveMaxOffset(this.currentRep|

this,selector,select(1000)

booleanok=this.processReadEvent();

:?J.(【)"'」心"「()

MMMMW

[10nginterval

|HAService.this.getDefaultMessageStore(),getSystemClock()[

|-this.

|if(interval〉HAService.this.getDefaultMessageStore(),getMe|

bsageStoreConfig()

|.getHaHousekeepinglnterval

|log,warn("HAC1ient,housekeeping,foundthisconnection],+|

this.masterAddress

+”]expired,"+interval);

this.closeMaster();

log.warn(nHAClient,masternotresponsesometime,soclose

connection");

}else{

this.waitForRunning(1000*5);

}catch(Exceptione){

log.warn(this.getServiceName()+“servicehasexception.”,e);

this,waitForRunning(1000*5);

log,info(this.getServiceName()+“serviceend");

這里需要詳細(xì)看幾個(gè)比較重要的點(diǎn):

(1)slave連接master完成slave連接master的函數(shù)是connectMaster,該

函數(shù)主要完成以下操作:

?獲取master的地址并連接,這里需要注意在開(kāi)始時(shí)將通道設(shè)置為阻塞

的,在connect完成后又將連接通道設(shè)置為非阻塞的

[publicstaticSocketChannelconnect(SocketAddressremote,finalinttimeo|

|SocketCharmelsc二mil1;

|sc二SocketCharmel.open();

|sc.configureBlocking(true);

|sc.socket().setSoLinger(fa卜a-1)

|sc.socket(),setTcpNoDelay(true);

|sc.socket().connect(remote,timeoutMillis)

Isc.configureBlocking(false);

|returnsc;

|}catch(Exceptione)

|sc.close();

|}catch([OExceptionel)

returnnull;

?注冊(cè)O(shè)P_READ事件

?獲取slave端commitlog的最大物理偏移量并緩存在

currentReportedOffset

?更新lastWriteTimestamp,lastWriteTimestamp的作用是用來(lái)計(jì)算

master和slave之間同步的時(shí)間間隔

privatebooleanconnectMaster()throwsClosedChannelException

|if(null■二二socketChannel)

|Stringaddr二this.masterAddress.get();

|SocketAddresssocketAddress=RemotingLti1.string2SocketAddre|

|ss(addr);

|if(socketAddress!=null)

|二;..、()(-k(—;udR(——〔iLL)nn(、(「〔(s()(「kc——s);■

|if(this,socketchannel!=null)

|this.socketChannel.register(this,selector,SelectionKey.0|

pREAD);

|this.currentReportedOffset=HAService.this.defaultMessageStore|

this,lastWriteTimestamp=System,current!imeMiHis();

returnthis,socketchannel!=null;

(2)isTimeToReportOffset()isTimeToReportOffset()方法的作用是判斷是

否向master匯報(bào)slave端commitlog的最大物理偏移量,判斷依據(jù)是計(jì)算當(dāng)前

時(shí)間與lastWriteTimestamp的時(shí)間間隔,如果該時(shí)間間隔大于

haSendHeartbeatInterval(默認(rèn)是5秒,可以在配置文件中進(jìn)行修改)

)rivatebooleanisTimeToReportOffset(){

longinterval二

HAService.this.defaultMessageStore.getSystemClockQ.now()一thi

.lastWriteTimestamp;

booleanneedHeart=interval>HAService.this.defaultMessageStore.

IgetMessageStoreConfig()

|,getHaSendHeartbeatInterval();

(3)reportSlaveMaxOffset(this.currentReportedOffset)

reportSlaveMaxOffset方法的作用是向master匯報(bào)slave端commitlog最大

物理偏移量,也就是將currentReportedOffset匯報(bào)給master,這里可以發(fā)現(xiàn)

是將currentReportedOffset存儲(chǔ)在reportOffset中(reportOffset是8個(gè)

字節(jié)),然后將reportOffset發(fā)送給master,這里在向master發(fā)送時(shí)有個(gè)循

環(huán),猜測(cè)該循環(huán)與通道是非阻塞性質(zhì)有關(guān),加上循環(huán)可以確保reportOffset發(fā)

送完成,發(fā)送完成后修改lastWriteTimestamp,最后如果reportOffset沒(méi)有

空余則返回true

xrivatebooleanreportSlaveMaxOffset(finallongmaxOffset)

this.reportOffset.position(0);

this.reportOffset.],益

this.reportOffset.putLong(maxOffset);

this.reportOffset.position(0);

this.reportOffset.1,??;十

r(inti=0;i<3&&this.reportOffset.hasRemainingQ;i++)

try

this.socketChannel.write(this.reportOffset);

}catch(lOExceptione){

log,error(this.getServiceName()

+nreportSlaveMaxOffsetthis.socketChannel.writeexception11,

e);

returnfalse;

lastWriteTimestamp=HAService.this.defaultMessageStore.getSyste|

|mClock().now();

return!this.reportOffset.hasRemainingO;

(4)reportSlaveMaxOffsetPlus()reportSlaveMaxOffsetPlus方法的作用是

判斷slave端的當(dāng)前commitlog的最大物理偏移量是否有增長(zhǎng),如果有則更新

reportSlaveMaxOffset并調(diào)用reportSlaveMaxOffset向master匯報(bào)

rivatebooleanreportSlaveMaxOffsetPlus()

booleanresult=true;

|10ngcurrentPhyOffset=HAService.this.defaultMessageStore.getMax|

if(currentPhyOffset>this.currentReportedOffset)

result=this.reportSlaveMaxOffset(this.currentReportedOffset);

if(!result)

this.closeMaster();

log.error("HAClient,reportSlaveMaxOffseterror,"+this,curr

entReportedOffset);

returnresult;

3.master根據(jù)待拉取數(shù)據(jù)的物理偏移量打包數(shù)據(jù)并發(fā)給slave

3.1master讀取數(shù)據(jù)

從上面可知master讀取slave發(fā)送數(shù)據(jù)是由HAConnection的

readSocketService對(duì)象的processReadEvent方法完成的,具體如下:

(1)首先判斷byteBufferRead是否還有剩余空間,byteBufferRead的作用是

存儲(chǔ)master讀取的數(shù)據(jù),如果沒(méi)有空間則對(duì)byteBufferRead進(jìn)行flip操作,

同時(shí)將processPosition重置為0,processPosition表示處理

byteBufferRead的位置

(2)讀取通道中的數(shù)據(jù)并存儲(chǔ)到byteBufferRead

(3)判斷byteBufferRead的position與processPosition之間的差值是否大

于等于8,之所以進(jìn)行判斷是因?yàn)閟lave向master匯報(bào)commitlog的最大物理

偏移量占了8個(gè)字節(jié),所以如果大于等于8則表示有一個(gè)完整的數(shù)據(jù)可以進(jìn)行

處理。在條件滿足的情況下會(huì)進(jìn)行以下操作:

?計(jì)算出byteBufferRead距離當(dāng)前位置最近的位置,具體見(jiàn)下圖:假設(shè)下

圖中每個(gè)單元格代表8個(gè)字節(jié),position是(24,32)之間的任意一個(gè)位

置,現(xiàn)在要計(jì)算的是距離當(dāng)前位置最近的一個(gè)完整的數(shù)據(jù)的結(jié)束位置,

具體算法是:this.byteBufferRead.position()-

(this.byteBufferRead.position()%8),計(jì)算出結(jié)束的位置后用當(dāng)前

位置減去8就是開(kāi)始的位置

?讀?。踦os-8,pos]之間的數(shù)據(jù)并存儲(chǔ)在readOffset

?將processPosition移動(dòng)到pos位置

(4)將上面讀取到的待拉取數(shù)據(jù)的物理偏移量存儲(chǔ)在slaveAckOffset(5)

判斷slaveRequestOffset是否小于0,如果小于0則更新為本次slave待拉取

數(shù)據(jù)的物理偏移量(slaveAckOffset存儲(chǔ)的是slave端已經(jīng)拉取完成的物理偏

移量,slaveRequestOffset存儲(chǔ)的是slave端請(qǐng)求拉取的數(shù)據(jù)的物理偏移量)

rivatebooleanprocessReadEvent()

if(!this.byteBufferRead.hasRemainingQ)

,

?P!,fl,'pssin>i;i川-:0:

|while(this.byteBufferRead.hasRemainingO)

|intreadSize=this.socketChannel.read(this.byteBufferRead);.

|readSizeZeroT;二c?

|this.lastReadTimestamp=HAConnection.this.haService.getDef

|aultMessageStore().getSystemClock().now();

|if((this.byteBufferRead.position。-this.processPosition)

|intpos二this.byteBufferRead.position()一(this.byteBuffei

longreadOffset=this.byteBufferRead.getLong(pos-8);

this.processPosition=pos;

|HAConnection.this.slaveAckOffset二readOffset

|if(HAConnection.this.slaveRequestOffset<0)

|HAConnection.this.slaveRequestOffset=readOffset

Ilog.info(,,slave[>t+HAConnection.this.clientAddr+w]req|

|HAConnection.this.haService.notifyTransferSome(HAConnect

if(++readSizeZeroTimes>=3)

log,error("readsocket1"+HAConnection.this.clientAddr+”]

returnfalse;

}catch(lOExceptione){

10g.error("processReadEventexception",e);

returnfalse;

returntrue;

3.2master向slave寫(xiě)數(shù)據(jù)

接著來(lái)看下master讀取完待拉取數(shù)據(jù)后的操作,這里就到

writeSocketService了,其主要完成的master向slave寫(xiě)數(shù)據(jù)的功能。其實(shí)

現(xiàn)都在run方法中,具體如下:

(1)首先判斷slaveRequestOffset是否等于T,如果等于-1則表示master

還沒(méi)有收到slave端待拉取數(shù)據(jù)的請(qǐng)求。這里需要注意slaveRequestOffset是

master在收到slave端匯報(bào)的待拉取數(shù)據(jù)物理偏移量是更新的,即

readSocketService的processReadEvent方法中。

(2)判斷nextTransferFromWhere是否等于T,nextTransferFromWhere表示

master下次向slave同步數(shù)據(jù)的物理偏移量,如果它的值為T則表示是第一次

進(jìn)行數(shù)據(jù)同步。如果slaveRequestOffset等于0則從當(dāng)前commitlog中最后一

個(gè)文件開(kāi)始進(jìn)行數(shù)據(jù)傳輸。如果slaveRequestOffset不等于0則

nextTransferFromWhere被賦值為slaveRequestOffset,即從slave請(qǐng)求的位

置開(kāi)始傳輸數(shù)據(jù)。

(3)lastWriteOver表示的是上次數(shù)據(jù)傳輸是否完成,如果上次數(shù)據(jù)傳輸已經(jīng)

完成并且當(dāng)前距離上次最后寫(xiě)入時(shí)間的間隔大于haSendHeartbeatInterval

(默認(rèn)是5秒可在配置文件中進(jìn)行配置)則會(huì)向slave發(fā)送一個(gè)12字節(jié)的數(shù)據(jù)

包,其中前8個(gè)字節(jié)用來(lái)存儲(chǔ)nextTransferFromWhere,后4個(gè)字節(jié)存儲(chǔ)的值

為0。如果上次數(shù)據(jù)傳輸沒(méi)有完成則會(huì)繼續(xù)傳輸數(shù)據(jù)然后再判斷是否傳輸完

成,如果還是沒(méi)有傳輸完成則結(jié)束本次事件處理,等到下次事件處理時(shí)繼續(xù)傳

輸上次沒(méi)有傳輸完成的數(shù)據(jù)。

(4)如果上次數(shù)據(jù)已經(jīng)傳輸完成,則會(huì)根據(jù)nextTransferFromWhere獲取該偏

移量之后所有的數(shù)據(jù),如果在該偏移量之后沒(méi)有數(shù)據(jù)則會(huì)等待100毫秒,如果

數(shù)據(jù)不為空,首先會(huì)判斷數(shù)據(jù)的長(zhǎng)度是否大于haTransferBatchSize(默認(rèn)是

32KB)從這里可以看出slave很有可能會(huì)收到master傳輸?shù)牟煌暾南?。?/p>

著會(huì)用變量thisOffset記錄本次數(shù)據(jù)傳輸開(kāi)始的偏移量,然后更新

nextTransferFromWhere,并對(duì)selectResult的ByteBuffer進(jìn)行l(wèi)imit操作限

制本次傳輸數(shù)據(jù)的大小,最后將selectResult賦給

selectMappedBufferResulto在進(jìn)行數(shù)據(jù)傳輸前會(huì)先在byteBufferHeader中記

錄本次數(shù)據(jù)傳輸?shù)拈_(kāi)始位置thisOffset和傳輸?shù)臄?shù)據(jù)大小。

(5)調(diào)用transferDataO方法進(jìn)行數(shù)據(jù)傳輸,在進(jìn)行數(shù)據(jù)傳輸時(shí)會(huì)先傳輸

byteBufferHeader,然后傳輸selectMappedBufferResulto數(shù)據(jù)傳輸完成后會(huì)

對(duì)selectMappedBufferResult釋放。

這里我們稍微總結(jié)下,master向slave傳輸?shù)臄?shù)據(jù)包實(shí)際上分為兩種:

?不包含消息的數(shù)據(jù)包這類數(shù)據(jù)包共12個(gè)字節(jié),其中前8個(gè)字節(jié)用來(lái)存

儲(chǔ)master向slave同步數(shù)據(jù)的起始偏移量,后4個(gè)字節(jié)存儲(chǔ)的是消息的

長(zhǎng)度,這里存儲(chǔ)的值為0

?包含消息的數(shù)據(jù)包這類數(shù)據(jù)包共分為三個(gè)部分,其中前8個(gè)字節(jié)用來(lái)存

儲(chǔ)本次向slave傳輸數(shù)據(jù)的起始偏移量,后四個(gè)字節(jié)用來(lái)存儲(chǔ)本次傳輸

的消息長(zhǎng)度,最后一個(gè)部分占用size字節(jié)表示的是消息

publicvoidrun(){

HAConnection.log.info(this,getServiceName()+"servicestarted11)

while(!this.isStoppedO)

this.selector,select(1000);

if(-1==HAConnection.this.slaveRequestOffset)

nue;

|if(-1■二二this.nextTransferFromWhere)

|if(0=HAConnection.this.slaveRequestOffset)

|longmasterOffset=HAConnection.this.haService.getDefault|

MessageStore().getCommitLogO.getMaxOffset();

|masterOffset

|master0ffse

|一(masterOffset%HAConnection.this.haService.getDefau|

[tMessageStore().getMessageStoreConfig()^^^^^^^^^^^^^^^^^^^^[

|.getMappedFileSizeCommitLogO);

1

~"""""」.7;「4);N"

this.nextTransferFromWhere=masterOffset;

this.nextTransferFromWhere=HAConnection.this.slaveReque|

stOffset;

|log,info("mastertransferdatafrom"+this.nextTransferFrom|

/here+"toslave]"+HAConnection.this.cl"n+八乂小

+“],andslaverequest"+HAConnection.this.slaveRequestOf

fset);

|longinterval

|HAConnection.this.haService.getDefaultMessageStore(),get]

|if(interval>HAConnection.this.haService.getDefaultMessage|

[Store().getMessageStoreConf

|.getHaSendHeartbeatInterval())

|this.byteBufferHeader.position(0);

.bylel?un,('rll('<ici(?r.IimiI(hc^idrrSize):

this.byteBufferHeader.putLong(this.nextTransferFromWhere

B

this.byteBufferHeader.putlnt(O)

this.lastWriteOver=this.transferDataQ;

nue;

this.lastWriteOver二this.transferDataQ;

inue;

|SelectMappedBufferResultselectResult=

|HAConnection.this.haService.getDefaultMessageStoreQtgetCo]

^nmitLogData(this.nextTransferFromWhere);

|intsize二selectResult.getSize()

|if(size>HAConnection.this.haService.getDefaultMessageStor|

,().getMessageStoreConfig().getllaTransferBatchSize())第

size二HAConnection.this.haService.getDefaultMessageStore|

().getMessageStoreConfig().getHaTransferBatchSize();

longthisOffset二this.nextTransferFromWhere;^^^^^^^^J

this.nextTransferFromWhere+=

selectResult.getByteBuffer().limit仁;”)

this.selectMappedBufferResult=selectResult;^^^^^^^^^J

this.byteBufferHeader.position(0);

this.byteBufferHeader.limit(headerSize);

this.byteBufferHeader.putLong(thisOffset)

this.byteBufferHeader.putlnt(size);

this.lastWriteOver二this.transferDataQ;

HAConnection.this.haService.getWaitNotifyObject(),allWaitF

}catch(Exception。){|

HAConnection.log.error(this.getServiceName()+“servicehase

xception.n,e);

break;

HAConnection.this.haService.getWaitNotifyObject().removeFromWai

tingThreadTable();

if(this.selectMappedBufferResult!=null)

this.selectMappedBufferResult.release。;

this.makeStop();

readSocketService.makeStop();

haService.removeConnection(HAConnection.this);

SelectionKeysk二this.socketChannel.keyFor(this,selector);

if(sk!=null)

sk.cancel();

this,selector.close();

this.socketChannel.close();

}catch(lOExceptione)

HAConnection.10g.error,'",e);

HAConnection.log.info(this^getServiceName()+"serviceend");

4.slave讀取master發(fā)送的數(shù)據(jù)包

slave處理讀事件的方法是HAClient中的processReadEvent方法,

byteBufferRead是slave端的讀緩沖區(qū)。processReadEvent方法的邏輯是首先

判斷byteBufferRead是否還有剩余空間,如果還有剩余空間則將channel中的

數(shù)據(jù)讀取到byteBufferRead中,然后調(diào)用dispatchReadRequest方法處理讀取

的數(shù)據(jù)。

?rivatebooleanprocessReadEvent(){

intreadSizeZeroTimes二0;

while(this.byteBufferRead.hasRemainingO)

intreadSize=this,socketchannel,read(this.byteBufferRead);

readSizeZeroTimes=0;

booleanresult二this.dispatchReadRequest()

10g.error("HAClient,dispatchReadRequesterror")

returnfalse;

if(++readSizeZeroTimes>=3)

log.info("HAClient,processReadEventreadsocket<0");

}catch(lOExceptione)

log,info("HAClient,processReadEventreadsocketexception”,(

B

returnfalse;

returntrue;

這里需要重點(diǎn)分析dispatchReadRequest方法:

(1)用readSocketPos記錄byteBufferRead當(dāng)前的position

(2)計(jì)算byteBufferRead中當(dāng)前position與dispatchPosition

(dispatchPosition指向byteBufferRead中已經(jīng)處理的位置的指針)差值

diff

(3)判斷diff是否大于等于12字節(jié),這里之所有判斷與12字節(jié)的關(guān)系是因

為master向slave發(fā)送的數(shù)據(jù)包前12字節(jié)包含了傳輸?shù)臄?shù)據(jù)的起始物理偏移

量以及傳輸?shù)臄?shù)據(jù)的長(zhǎng)度。如果條件成立則從byteBufferRead中分別讀取傳輸

的數(shù)據(jù)的起始物理偏移量和傳輸?shù)臄?shù)據(jù)的長(zhǎng)度并記錄在masterPhyOffset和

bodySize中。

(4)從當(dāng)前節(jié)點(diǎn)獲取commitlog的最大物理偏移量slavePhyOffset,并判斷

slavePhyOffset與masterPhyOffset是否相等,正常情況下兩者應(yīng)該是相等

的,如果不相等則輸出error信息。

(5)判斷diff是否大于等于msgHeaderSize+bodySize,如果條件成立表示

當(dāng)前有一個(gè)完整的數(shù)據(jù)包,然后執(zhí)行以下操作:

?將byteBufferRead的position設(shè)置為dispatchPosition+

msgHeaderSize,也就是數(shù)據(jù)包中數(shù)據(jù)開(kāi)始的位置

?讀取數(shù)據(jù)包中的消息并存儲(chǔ)在bodyData中

?調(diào)用appendToCommitLog方法完成數(shù)據(jù)追加操作

?將byteBufferRead的position重新設(shè)置到readSocketPos

?將dispatchPosition向前移動(dòng)msgHeaderSize+bodySize

?調(diào)用reportSlaveMaxOffsetPlus方法判斷slave端commitlog的是否有

追加,如果有新的增加則向master匯報(bào)currentReportedOffset

privatebooleandispatchReadRequest()

finalintmsgHeaderSize=8+4;”,

intreadSocketPos二this.byteBufferRead.position。

while(true)

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論