備注:
1.如果您此前未接觸過RocketMQ,請先瀏覽附錄部份,以便了解RocketMQ的整體架構和相干術語
2.文中的MQServer與Broker表示同1概念
散布式消息系統作為實現散布式系統可擴大、可伸縮性的關鍵組件,需要具有高吞吐量、高可用等特點。而談到消息系統的設計,就躲避不了兩個問題:
- 消息的順序問題
- 消息的重復問題
RocketMQ作為阿里開源的1款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?
消息有序指的是1類消息消費時,能依照發送的順序來消費。例如:1個定單產生了 3 條消息,分別是定單創建、定單付款、定單完成。消費時,要依照這個順序消費才成心義。但同時定單之間又是可以并行消費的。
假設生產者產生了2條消息:M1、M2,要保證這兩條消息的順序,應當怎樣做?你腦中想到的多是這樣:
M1發送到S1后,M2發送到S2,如果要保證M1先于M2被消費,那末需要M1到達消費端后,通知S2,然后S2再將M2發送到消費端。
這個模型存在的問題是,如果M1和M2分別發送到兩臺Server上,就不能保證M1先到達,也就不能保證M1被先消費,那末就需要在MQ Server集群保護消息的順序。那末如何解決?1種簡單的方式就是將M1、M2發送到同1個Server上:
這樣可以保證M1先于M2到達MQServer(客戶端等待M1成功后再發送M2),根據先到達先被消費的原則,M1會先于M2被消費,這樣就保證了消息的順序。
這個模型,理論上可以保證消息的順序,但在實際應用中你應當會遇到下面的問題:
只要將消息從1臺服務器發往另外一臺服務器,就會存在網絡延遲問題。如上圖所示,如果發送M1耗時大于發送M2的耗時,那末M2就先被消費,依然不能保證消息的順序。即便M1和M2同時到達消費端,由于不清楚消費端1和消費端2的負載情況,依然有可能出現M2先于M1被消費。如何解決這個問題?將M1和M2發往同1個消費者便可,且發送M1后,需要消費端響應成功后才能發送M2。
但又會引入另外1個問題,如果發送M1后,消費端1沒有響應,那是繼續發送M2呢,還是重新發送M1?1般為了保證消息1定被消費,肯定會選擇重發M1到另外1個消費端2,就以下圖所示。
這樣的模型就嚴格保證消息的順序,仔細的你依然會發現問題,消費端1沒有響應Server時有兩種情況,1種是M1確切沒有到達,另外1種情況是消費端1已響應,但是Server端沒有收到。如果是第2種情況,重發M1,就會造成M1被重復消費。也就是我們后面要說的第2個問題,消息重復問題。
回過頭來看消息順序問題,嚴格的順序消息非常容易理解,而且處理問題也比較容易,要實現嚴格的順序消息,簡單且可行的辦法就是:
保證生產者 - MQServer - 消費者是1對1對1的關系
但是這樣設計,并行度就成了消息系統的瓶頸(吞吐量不夠),也會致使更多的異常處理,比如:只要消費端出現問題,就會致使全部處理流程阻塞,我們不能不花費更多的精力來解決阻塞的問題。
但我們的終究目標是要集群的高容錯性和高吞吐量。這仿佛是1對不可調和的矛盾,那末阿里是如何解決的?
世界上解決1個計算機問題最簡單的方法:“恰好”不需要解決它!—— 沈詢
有些問題,看起來很重要,但實際上我們可以通過公道的設計或將問題分解來規避。如果硬要把時間花在解決它們身上,實際上是浪費的,效力低下的。從這個角度來看消息的順序問題,我們可以得出兩個結論:
1. 不關注亂序的利用實際大量存在
2. 隊列無序其實不意味著消息無序
最后我們從源碼角度分析RocketMQ怎樣實現發送順序消息。
1般消息是通過輪詢所有隊列來發送的(負載均衡策略),順序消息可以根據業務,比如說定單號相同的消息發送到同1個隊列。下面的示例中,OrderId相同的消息,會發送到同1個隊列:
// RocketMQ默許提供了兩種MessageQueueSelector實現:隨機/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
在獲得到路由信息以后,會根據MessageQueueSelector實現的算法來選擇1個隊列,同1個OrderId獲得到的隊列是同1個隊列。
private SendResult send() {
// 獲得topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根據我們的算法,選擇1個發送隊列
// 這里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}
上面在解決消息順序問題時,引入了1個新的問題,就是消息重復。那末RocketMQ是怎樣解決消息重復的問題呢?還是“恰好”不解決。
造成消息的重復的根本緣由是:網絡不可達。只要通過網絡交換數據,就沒法避免這個問題。所以解決這個問題的辦法就是不解決,轉而繞過這個問題。那末問題就變成了:如果消費端收到兩條1樣的消息,應當怎樣處理?
1. 消費端處理消息的業務邏輯保持冪等性
2. 保證每條消息都有唯1編號且保證消息處理成功與去重表的日志同時出現
第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都1樣。第2條原理就是利用1張日志表來記錄已處理成功的消息的ID,如果新到的消息ID已在日志表中,那末就不再處理這條消息。
我們可以看到第1條的解決方式,很明顯應當在消費端實現,不屬于消息系統要實現的功能。第2條可以消息系統實現,也能夠業務端實現。正常情況下出現重復消息的幾率不1定大,且由消息系統實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的緣由。
RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重。
RocketMQ除支持普通消息,順序消息,另外還支持事務消息。首先討論1下甚么是事務消息和支持事務消息的必要性。我們以1個轉帳的場景為例來講明這個問題:Bob向Smith轉賬100塊。
在單機環境下,履行事務的情況,大概是下面這個模樣:
當用戶增長到1定程度,Bob和Smith的賬戶及余額信息已不在同1臺服務器上了,那末上面的流程就變成了這樣:
這時候候你會發現,一樣是1個轉賬的業務,在集群環境下,耗時竟然成倍的增長,這明顯是不能夠接受的。那我們如何來規避這個問題?
大事務 = 小事務 + 異步
將大事務拆分成多個小事務異步履行。這樣基本上能夠將跨機事務的履行效力優化到與單機1致。轉賬的事務就能夠分解成以下兩個小事務:
圖中履行本地事務(Bob賬戶扣款)和發送異步消息應當保持同時成功或失敗中,也就是扣款成功了,發送消息1定要成功,如果扣款失敗了,就不能再發送消息。那問題是:我們是先扣款還是先發送消息呢?
首先我們看下,先發送消息,大致的示意圖以下:
存在的問題是:如果消息發送成功,但是扣款失敗,消費端就會消費此消息,進而向Smith賬戶加錢。
先發消息不行,那我們就先扣款唄,大致的示意圖以下:
存在的問題跟上面類似:如果扣款成功,發送消息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。
可能大家會有很多的方法來解決這個問題,比如:直接將發消息放到Bob扣款的事務中去,如果發送失敗,拋出異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。RocketMQ支持事務消息,下面我們來看看RocketMQ是怎樣來實現的。
RocketMQ第1階段發送Prepared消息時,會拿到消息的地址,第2階段履行本地事物,第3階段通過第1階段拿到的地址去訪問消息,并修改狀態。仔細的你可能又發現問題了,如果確認消息發送失敗了怎樣辦?RocketMQ會定期掃描消息集群中的事物消息,這時候候發現了Prepared消息,它會向消息發送者確認,Bob的錢究竟是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
那我們來看下RocketMQ源碼,是否是這樣來處理事務消息的。客戶端發送事務消息的部份(完全代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):
// 未決事務,MQ服務器回查客戶端
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務消息的生產者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設置事務決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務的處理邏輯,相當于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構造MSG,省略構造參數
Message msg = new Message(......);
// 發送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
接著查看sendMessageInTransaction方法的源碼,總共分為3個階段:發送Prepared消息、履行本地事務、發送確認消息。
public TransactionSendResult sendMessageInTransaction(.....) {
// 邏輯代碼,非實際代碼
// 1.發送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.如果消息發送成功,處理與消息關聯的本地事務單元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.結束事務
this.endTransaction(sendResult, localTransactionState, localException);
}
endTransaction方法會將要求發往broker(mq server)去更新事物消息的終究狀態:
1. 根據sendResult找到Prepared消息
2. 根據localTransaction更新消息的終究狀態
如果endTransaction方法履行失敗,致使數據沒有發送到broker,broker會有回查線程定時(默許1分鐘)掃描每一個存儲事務狀態的表格文件,如果是已提交或回滾的消息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction要求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調要求,而checkTransactionState會調用我們的事務設置的決斷方法,最后調用endTransactionOneway讓broker來更新消息的終究狀態。
再回到轉賬的例子,如果Bob的賬戶的余額已減少,且消息已發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是1直重試,直到消費端消費消息成功,全部進程中有可能會出現消息重復的問題,依照前面的思路解決便可。
這樣基本上可以解決超時問題,但是如果消費失敗怎樣辦?阿里提供給我們的解決方法是:人工解決。大家可以斟酌1下,依照事務的流程,由于某種緣由Smith加款失敗,需要回滾全部流程。如果消息系統要實現這個回滾流程的話,系統復雜度將大大提升,且很容易出現Bug,估計出現Bug的幾率會比消費失敗的幾率大很多。我們需要衡量是不是值得花這么大的代價來解決這樣1個出現幾率非常小的問題,這也是大家在解決疑問問題時需要多多思考的地方。
20160321補充:在3.2.6版本中移除事務消息的實現,所以此版本不支持事務消息,具體情況請參考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156
Producer輪詢某topic下的所有隊列的方式來實現發送方的負載均衡,以下圖所示:
首先分析1下RocketMQ的客戶端發送消息的源碼:
// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,全部利用生命周期內,只需要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic
"TagA",// tag:給消息打標簽,用于辨別1類消息,可為null
"OrderID188",// key:自定義Key,可以用于去重,可為null
("Hello MetaQ").getBytes());// body:消息內容
// 發送消息并返回結果
SendResult sendResult = producer.send(msg);
// 清算資源,關閉網絡連接,注銷自己
producer.shutdown();
在全部利用生命周期內,生產者需要調用1次start方法來初始化,初始化主要完成的任務有:
1. 如果沒有指定namesrv地址,將會自動尋址
2. 啟動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清算已掛掉的broker、向所有broker發送心跳…
3. 啟動負載均衡的服務
初始化完成后,開始發送消息,發送消息的主要代碼以下:
private SendResult sendDefaultImpl(Message msg,......) {
// 檢查Producer的狀態是不是是RUNNING
this.makeSureStateOK();
// 檢查msg是不是合法:是不是為null、topic,body是不是為空、body是不是超長
Validators.checkMessage(msg, this.defaultMQProducer);
// 獲得topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 從路由信息當選擇1個消息隊列
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
// 將消息發送到該隊列上去
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
代碼中需要關注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲得路由信息并更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲得topic路由信息,如果沒有獲得到,則會自己去namesrv獲得路由信息。selectOneMessageQueue方法通過輪詢的方式,返回1個隊列,以到達負載均衡的目的。
如果Producer發送消息失敗,會自動重試,重試的策略:
- 重試次數 < retryTimesWhenSendFailed(可配置)
- 總的耗時(包括重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
- 同時滿足上面兩個條件后,Producer會選擇另外1個隊列發送消息
RocketMQ的消息存儲是由consume queue和commit log配合完成的。
consume queue是消息的邏輯隊列,相當于字典的目錄,用來指定消息在物理文件commit log上的位置。
我們可以在配置中指定consumequeue與commitlog存儲的目錄
每一個topic下的每一個queue都有1個對應的consumequeue文件,比如:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue文件組織,如圖所示:
死信隊列(Dead Letter Queue)1般用于寄存由于某種緣由沒法傳遞的消息,比如處理失敗或已過期的消息。
Consume Queue中存儲單元是1個20字節定長的2進制數據,順序寫順序讀,以下圖所示:
CommitLog Offset是指這條消息在Commit Log文件中的實際偏移量
Size存儲中消息的大小
Message Tag HashCode存儲消息的Tag的哈希值:主要用于定閱時消息過濾(定閱時如果指定了Tag,會根據HashCode來快速查找到定閱的消息)
CommitLog:消息寄存的物理文件,每臺broker上的commitlog被本機所有的queue同享,不做任何辨別。
文件的默許位置以下,依然可通過配置文件修改:
${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構以下表所示,依照編號順序和編號對應的內容順次存儲。
消息存儲實現,比較復雜,也值得大家深入了解,后面會單獨成文來分析,這小節只以代碼說明1下具體的流程。
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// Here settings are stored timestamp, in order to ensure an orderly global
msg.setStoreTimestamp(beginLockTimestamp);
// MapedFile:操作物理文件在內存中的映照和將內存數據持久化到物理文件中
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
// 將Message追加到文件commitlog
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
DispatchRequest dispatchRequest = new DispatchRequest(
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10
// 1.分發消息位置到ConsumeQueue
// 2.分發到IndexService建立索引
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}
如果1個消息包括key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:
索引文件主要用于根據key來查詢消息的,流程主要是:
1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是1個索引文件里面包括的最大槽的數目,例如圖中所示 slotNum=5000000)
2. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最后1項(倒序排列,slotValue 總是指向最新的1個索引項)
3. 遍歷索引項列表返回查詢時間范圍內的結果集(默許1次最大返回的 32 條記錄)
RocketMQ消息定閱有兩種模式,1種是Push模式,即MQServer主動向消費端推送;另外1種是Pull模式,即消費端在需要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是采取消費端主動拉取的方式。
首先看下消費真個負載均衡:
消費端會通過RebalanceService線程,10秒鐘做1次基于topic下的所有隊列負載:
1. 遍歷Consumer下的所有topic,然后根據topic定閱所有的消息
2. 獲得同1topic和Consumer Group下的所有Consumer
3. 然后根據具體的分配策略來分配消費隊列,分配的策略包括:平均分配、消費端配置等
猶如上圖所示:如果有 5 個隊列,2 個 consumer,那末第1個 Consumer 消費 3 個隊列,第2 consumer 消費 2 個隊列。這里采取的就是平均分配策略,它類似于我們的分頁,TOPIC下面的所有queue就是記錄,Consumer的個數就相當于總的頁數,那末每頁有多少條記錄,就類似于某個Consumer會消費哪些隊列。
通過這樣的策略來到達大體上的平均消費,這樣的設計也能夠很方面的水平擴大Consumer來提高消費能力。
消費真個Push模式是通太長輪詢的模式來實現的,就猶如下圖:
Consumer端每隔1段時間主動向broker發送拉消息要求,broker在收到Pull要求后,如果有消息就立即返回數據,Consumer端收到返回的消息后,再回調消費者設置的Listener方法。如果broker在收到Pull要求時,消息隊列里沒有數據,broker端會阻塞要求直到有數據傳遞或超時才返回。
固然,Consumer端是通過1個線程將阻塞隊列LinkedBlockingQueue中的PullRequest發送到broker拉取消息,以避免Consumer1致被阻塞。而Broker端,在接收到Consumer的PullRequest時,如果發現沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來。broker在啟動時,會啟動1個線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數據返回。
前面的6個特性都是基本上都是點到為止,想要深入了解,還需要大家多多查看源碼,多多在實際中應用。固然除已提到的特性外,RocketMQ還支持:
1. 定時消息
2. 消息的刷盤策略
3. 主動同步策略:同步雙寫、異步復制
4. 海量消息堆積能力
5. 高效通訊 .
6. ……
其中觸及到的很多設計思路和解決方法都值得我們深入研究:
1. 消息的存儲設計:既要滿足海量消息的堆積能力,又要滿足極快的查詢效力,還要保證寫入的效力。
2. 高效的通訊組件設計:高吞吐量,毫秒級的消息投遞能力都離不開高效的通訊。
3. …….
1、1個利用盡量用1個 Topic,消息子類型用 tags 來標識,tags 可以由利用自由設置。只有發送消息設置了tags,消費方在定閱消息時,才可以利用 tags 在 broker 做消息過濾。
2、每一個消息在業務層面的唯1標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。由因而哈希索引,請務必保證 key 盡量唯1,這樣可以免潛伏的哈希沖突。
3、消息發送成功或失敗,要打印消息日志,務必要打印 sendresult 和 key 字段。
4、對消息不可丟失利用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或人工觸發重發。
5、某些利用如果不關注消息是不是發送成功,請直接使用sendOneWay方法發送消息。
消費進程要做到冪等(即消費端去重)
盡可能使用批量方式消費方式,可以很大程度上提高消費吞吐量
優化每條消息消費進程
線上應當關閉autoCreateTopicEnable,即在配置文件中將其設置為false。
RocketMQ在發送消息時,會首先獲得路由信息。如果是新的消息,由于MQServer上面還沒有創建對應的Topic,這個時候,如果上面的配置打開的話,會返回默許TOPIC的(RocketMQ會在每臺broker上面創建名為TBW102的TOPIC)路由信息,然后Producer會選擇1臺Broker發送消息,選中的broker在存儲消息時,發現消息的topic還沒有創建,就會自動創建topic。后果就是:以后所有該TOPIC的消息,都將發送到這臺broker上,達不到負載均衡的目的。
所以基于目前RocketMQ的設計,建議關閉自動創建TOPIC的功能,然后根據消息量的大小,手動創建TOPIC。
RocketMQ設計相干
RocketMQ的設計假定:
每臺PC機器都可能宕機不可服務
任意集群都有可能處理能力不足
最壞的情況1定會產生
內網環境需要低延遲來提供最好用戶體驗
RocketMQ的關鍵設計:
散布式集群化
強數據安全
海量數據堆積
毫秒級投遞延遲(推拉模式)
這是RocketMQ在設計時的假定條件和需要到達的效果。我想這些假定適用于所有的系統設計。隨著我們系統的服務的增多,每位開發者都要注意自己的程序是不是存在單點故障,如果掛了應當怎樣恢復、能不能很好的水平擴大、對外的接口是不是足夠高效、自己管理的數據是不是足夠安全…… 多多規范自己的設計,才能開發出高效硬朗的程序。
topic表示消息的第1級類型,比如1個電商系統的消息可以分為:交易消息、物流消息…… 1條消息必須有1個Topic。
Tag表示消息的第2級類型,比如交易消息又可以分為:交易創建消息,交易完成消息….. 1條消息可以沒有Tag。RocketMQ提供2級消息分類,方便大家靈活控制。
1個topic下,我們可以設置多個queue(消息隊列)。當我們發送消息時,需要要指定該消息的topic。RocketMQ會輪詢該topic下的所有隊列,將消息發送出去。
Producer表示消息隊列的生產者。消息隊列的本質就是實現了publish-subscribe模式,生產者生產消息,消費者消費消息。所以這里的Producer就是用來生產和發送消息的,1般指業務系統。
Producer Group是1類Producer的集合名稱,這類Producer通常發送1類消息,且發送邏輯1致。
消息消費者,1般由后臺系統異步消費消息。
Consumer 的1種,利用通常向 Consumer 對象注冊1個 Listener 接口,1旦收到消息,Consumer 對象立刻回調 Listener 接口方法。
Consumer 的1種,利用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由利用控制。
Consumer Group是1類Consumer的集合名稱,這類Consumer通常消費1類消息,且消費邏輯1致。
消息的中轉者,負責存儲和轉發消息。可以理解為消息隊列服務器,提供了消息的接收、存儲、拉取和轉發服務。broker是RocketMQ的核心,它不不能掛的,所以需要保證broker的高可用。
1條消息被多個Consumer消費,即便這些Consumer屬于同1個Consumer Group,消息也會被Consumer Group中的每一個Consumer都消費1次。在廣播消費中的Consumer Group概念可以認為在消息劃分方面無意義。
1個Consumer Group中的Consumer實例平均分攤消費消息。例如某個Topic有 9 條消息,其中1個Consumer Group有 3 個實例(多是 3 個進程,或 3 臺機器),那末每一個實例只消費其中的 3 條消息。
NameServer即名稱服務,兩個功能:
接收broker的要求,注冊broker的路由信息
接口client的要求,根據某個topic獲得其到broker的路由信息
NameServer沒有狀態,可以橫向擴大。每一個broker在啟動的時候會到NameServer注冊;Producer在發送消息前會根據topic到NameServer獲得路由(到broker)信息;Consumer也會定時獲得topic路由信息。
Producer向1些隊列輪番發送消息,隊列集合稱為Topic,Consumer如果做廣播消費,則1個consumer實例消費這個Topic對應的所有隊列;如果做集群消費,則多個Consumer實例平均消費這個Topic對應的隊列集合。
再看下RocketMQ物理部署結構圖:
RocketMQ網絡部署特點:
Name Server 是1個幾近無狀態節點,可集群部署,節點之間無任何信息同步。
Broker部署相對復雜,Broker分為Master與Slave,1個Master可以對應多個Slave,但是1個Slave只能對應1個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId=0表示Master,非0表示Slave。Master也能夠部署多個。每一個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server。
Producer與Name Server集群中的其中1個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer 完全無狀態,可集群部署。
Consumer與Name Server集群中的其中1個節點(隨機選擇)建立長連接,定期從Name Server取Topic 路由信息,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master定閱消息,也能夠從Slave定閱消息,定閱規則由Broker配置決定。