多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php框架 > 框架設計 > RocketMQ(三)分布式開放消息系統(RocketMQ)的原理與實踐

RocketMQ(三)分布式開放消息系統(RocketMQ)的原理與實踐

來源:程序員人生   發布時間:2016-07-06 08:48:24 閱讀次數:10887次

備注:
1.如果您此前未接觸過RocketMQ,請先瀏覽附錄部份,以便了解RocketMQ的整體架構和相干術語
2.文中的MQServer與Broker表示同1概念

散布式消息系統作為實現散布式系統可擴大、可伸縮性的關鍵組件,需要具有高吞吐量、高可用等特點。而談到消息系統的設計,就躲避不了兩個問題:

  1. 消息的順序問題
  2. 消息的重復問題

RocketMQ作為阿里開源的1款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?

關鍵特性和其實現原理

1、順序消息

消息有序指的是1類消息消費時,能依照發送的順序來消費。例如:1個定單產生了 3 條消息,分別是定單創建、定單付款、定單完成。消費時,要依照這個順序消費才成心義。但同時定單之間又是可以并行消費的。

假設生產者產生了2條消息:M1、M2,要保證這兩條消息的順序,應當怎樣做?你腦中想到的多是這樣:

這里寫圖片描述

M1發送到S1后,M2發送到S2,如果要保證M1先于M2被消費,那末需要M1到達消費端后,通知S2,然后S2再將M2發送到消費端。

這個模型存在的問題是,如果M1和M2分別發送到兩臺Server上,就不能保證M1先到達,也就不能保證M1被先消費,那末就需要在MQ Server集群保護消息的順序。那末如何解決?1種簡單的方式就是將M1、M2發送到同1個Server上:

這里寫圖片描述

這樣可以保證M1先于M2到達MQServer(客戶端等待M1成功后再發送M2),根據先到達先被消費的原則,M1會先于M2被消費,這樣就保證了消息的順序。

這個模型,理論上可以保證消息的順序,但在實際應用中你應當會遇到下面的問題:

這里寫圖片描述

只要將消息從1臺服務器發往另外一臺服務器,就會存在網絡延遲問題。如上圖所示,如果發送M1耗時大于發送M2的耗時,那末M2就先被消費,依然不能保證消息的順序。即便M1和M2同時到達消費端,由于不清楚消費端1和消費端2的負載情況,依然有可能出現M2先于M1被消費。如何解決這個問題?將M1和M2發往同1個消費者便可,且發送M1后,需要消費端響應成功后才能發送M2。

但又會引入另外1個問題,如果發送M1后,消費端1沒有響應,那是繼續發送M2呢,還是重新發送M1?1般為了保證消息1定被消費,肯定會選擇重發M1到另外1個消費端2,就以下圖所示。

這里寫圖片描述

這樣的模型就嚴格保證消息的順序,仔細的你依然會發現問題,消費端1沒有響應Server時有兩種情況,1種是M1確切沒有到達,另外1種情況是消費端1已響應,但是Server端沒有收到。如果是第2種情況,重發M1,就會造成M1被重復消費。也就是我們后面要說的第2個問題,消息重復問題。

回過頭來看消息順序問題,嚴格的順序消息非常容易理解,而且處理問題也比較容易,要實現嚴格的順序消息,簡單且可行的辦法就是:

保證生產者 - MQServer - 消費者是1對1對1的關系

但是這樣設計,并行度就成了消息系統的瓶頸(吞吐量不夠),也會致使更多的異常處理,比如:只要消費端出現問題,就會致使全部處理流程阻塞,我們不能不花費更多的精力來解決阻塞的問題。

但我們的終究目標是要集群的高容錯性和高吞吐量。這仿佛是1對不可調和的矛盾,那末阿里是如何解決的?

世界上解決1個計算機問題最簡單的方法:“恰好”不需要解決它!—— 沈詢

有些問題,看起來很重要,但實際上我們可以通過公道的設計或將問題分解來規避。如果硬要把時間花在解決它們身上,實際上是浪費的,效力低下的。從這個角度來看消息的順序問題,我們可以得出兩個結論:

1. 不關注亂序的利用實際大量存在
2. 隊列無序其實不意味著消息無序

最后我們從源碼角度分析RocketMQ怎樣實現發送順序消息。

1般消息是通過輪詢所有隊列來發送的(負載均衡策略),順序消息可以根據業務,比如說定單號相同的消息發送到同1個隊列。下面的示例中,OrderId相同的消息,會發送到同1個隊列:

// RocketMQ默許提供了兩種MessageQueueSelector實現:隨機/Hash SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);

在獲得到路由信息以后,會根據MessageQueueSelector實現的算法來選擇1個隊列,同1個OrderId獲得到的隊列是同1個隊列。

private SendResult send() { // 獲得topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; // 根據我們的算法,選擇1個發送隊列 // 這里的arg = orderId mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); } } }

2、消息重復

上面在解決消息順序問題時,引入了1個新的問題,就是消息重復。那末RocketMQ是怎樣解決消息重復的問題呢?還是“恰好”不解決。

造成消息的重復的根本緣由是:網絡不可達。只要通過網絡交換數據,就沒法避免這個問題。所以解決這個問題的辦法就是不解決,轉而繞過這個問題。那末問題就變成了:如果消費端收到兩條1樣的消息,應當怎樣處理?

1. 消費端處理消息的業務邏輯保持冪等性
2. 保證每條消息都有唯1編號且保證消息處理成功與去重表的日志同時出現

第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都1樣。第2條原理就是利用1張日志表來記錄已處理成功的消息的ID,如果新到的消息ID已在日志表中,那末就不再處理這條消息。

我們可以看到第1條的解決方式,很明顯應當在消費端實現,不屬于消息系統要實現的功能。第2條可以消息系統實現,也能夠業務端實現。正常情況下出現重復消息的幾率不1定大,且由消息系統實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的緣由。

RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重。

3、事務消息

RocketMQ除支持普通消息,順序消息,另外還支持事務消息。首先討論1下甚么是事務消息和支持事務消息的必要性。我們以1個轉帳的場景為例來講明這個問題:Bob向Smith轉賬100塊。

在單機環境下,履行事務的情況,大概是下面這個模樣:
這里寫圖片描述

當用戶增長到1定程度,Bob和Smith的賬戶及余額信息已不在同1臺服務器上了,那末上面的流程就變成了這樣:
這里寫圖片描述

這時候候你會發現,一樣是1個轉賬的業務,在集群環境下,耗時竟然成倍的增長,這明顯是不能夠接受的。那我們如何來規避這個問題?

大事務 = 小事務 + 異步

將大事務拆分成多個小事務異步履行。這樣基本上能夠將跨機事務的履行效力優化到與單機1致。轉賬的事務就能夠分解成以下兩個小事務:

這里寫圖片描述

圖中履行本地事務(Bob賬戶扣款)和發送異步消息應當保持同時成功或失敗中,也就是扣款成功了,發送消息1定要成功,如果扣款失敗了,就不能再發送消息。那問題是:我們是先扣款還是先發送消息呢?

首先我們看下,先發送消息,大致的示意圖以下:
這里寫圖片描述

存在的問題是:如果消息發送成功,但是扣款失敗,消費端就會消費此消息,進而向Smith賬戶加錢。

先發消息不行,那我們就先扣款唄,大致的示意圖以下:

這里寫圖片描述

存在的問題跟上面類似:如果扣款成功,發送消息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。

可能大家會有很多的方法來解決這個問題,比如:直接將發消息放到Bob扣款的事務中去,如果發送失敗,拋出異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。RocketMQ支持事務消息,下面我們來看看RocketMQ是怎樣來實現的。
這里寫圖片描述

RocketMQ第1階段發送Prepared消息時,會拿到消息的地址,第2階段履行本地事物,第3階段通過第1階段拿到的地址去訪問消息,并修改狀態。仔細的你可能又發現問題了,如果確認消息發送失敗了怎樣辦?RocketMQ會定期掃描消息集群中的事物消息,這時候候發現了Prepared消息,它會向消息發送者確認,Bob的錢究竟是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

那我們來看下RocketMQ源碼,是否是這樣來處理事務消息的。客戶端發送事務消息的部份(完全代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// 未決事務,MQ服務器回查客戶端 // 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 構造事務消息的生產者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設置事務決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務的處理邏輯,相當于示例中檢查Bob賬戶并扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構造MSG,省略構造參數 Message msg = new Message(......); // 發送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();

接著查看sendMessageInTransaction方法的源碼,總共分為3個階段:發送Prepared消息、履行本地事務、發送確認消息。

public TransactionSendResult sendMessageInTransaction(.....) { // 邏輯代碼,非實際代碼 // 1.發送消息 sendResult = this.send(msg); // sendResult.getSendStatus() == SEND_OK // 2.如果消息發送成功,處理與消息關聯的本地事務單元 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3.結束事務 this.endTransaction(sendResult, localTransactionState, localException); }

endTransaction方法會將要求發往broker(mq server)去更新事物消息的終究狀態:

1. 根據sendResult找到Prepared消息
2. 根據localTransaction更新消息的終究狀態

如果endTransaction方法履行失敗,致使數據沒有發送到brokerbroker會有回查線程定時(默許1分鐘)掃描每一個存儲事務狀態的表格文件,如果是已提交或回滾的消息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction要求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調要求,而checkTransactionState會調用我們的事務設置的決斷方法,最后調用endTransactionOnewaybroker來更新消息的終究狀態。

再回到轉賬的例子,如果Bob的賬戶的余額已減少,且消息已發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是1直重試,直到消費端消費消息成功,全部進程中有可能會出現消息重復的問題,依照前面的思路解決便可。

這里寫圖片描述

這樣基本上可以解決超時問題,但是如果消費失敗怎樣辦?阿里提供給我們的解決方法是:人工解決。大家可以斟酌1下,依照事務的流程,由于某種緣由Smith加款失敗,需要回滾全部流程。如果消息系統要實現這個回滾流程的話,系統復雜度將大大提升,且很容易出現Bug,估計出現Bug的幾率會比消費失敗的幾率大很多。我們需要衡量是不是值得花這么大的代價來解決這樣1個出現幾率非常小的問題,這也是大家在解決疑問問題時需要多多思考的地方。

20160321補充:在3.2.6版本中移除事務消息的實現,所以此版本不支持事務消息,具體情況請參考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156

4、Producer如何發送消息

Producer輪詢某topic下的所有隊列的方式來實現發送方的負載均衡,以下圖所示:
這里寫圖片描述

首先分析1下RocketMQ的客戶端發送消息的源碼:

// 構造Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 初始化Producer,全部利用生命周期內,只需要初始化1次 producer.start(); // 構造Message Message msg = new Message("TopicTest1",// topic "TagA",// tag:給消息打標簽,用于辨別1類消息,可為null "OrderID188",// key:自定義Key,可以用于去重,可為null ("Hello MetaQ").getBytes());// body:消息內容 // 發送消息并返回結果 SendResult sendResult = producer.send(msg); // 清算資源,關閉網絡連接,注銷自己 producer.shutdown();

在全部利用生命周期內,生產者需要調用1次start方法來初始化,初始化主要完成的任務有:

1. 如果沒有指定namesrv地址,將會自動尋址
2. 啟動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清算已掛掉的broker、向所有broker發送心跳…
3. 啟動負載均衡的服務

初始化完成后,開始發送消息,發送消息的主要代碼以下:

private SendResult sendDefaultImpl(Message msg,......) { // 檢查Producer的狀態是不是是RUNNING this.makeSureStateOK(); // 檢查msg是不是合法:是不是為null、topic,body是不是為空、body是不是超長 Validators.checkMessage(msg, this.defaultMQProducer); // 獲得topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 從路由信息當選擇1個消息隊列 MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); // 將消息發送到該隊列上去 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); }

代碼中需要關注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲得路由信息并更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲得topic路由信息,如果沒有獲得到,則會自己去namesrv獲得路由信息。selectOneMessageQueue方法通過輪詢的方式,返回1個隊列,以到達負載均衡的目的。

如果Producer發送消息失敗,會自動重試,重試的策略:

  1. 重試次數 < retryTimesWhenSendFailed(可配置)
  2. 總的耗時(包括重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
  3. 同時滿足上面兩個條件后,Producer會選擇另外1個隊列發送消息

5、消息存儲

RocketMQ的消息存儲是由consume queuecommit log配合完成的。

1、Consume Queue

consume queue是消息的邏輯隊列,相當于字典的目錄,用來指定消息在物理文件commit log上的位置。

我們可以在配置中指定consumequeuecommitlog存儲的目錄
每一個topic下的每一個queue都有1個對應的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件組織,如圖所示:
這里寫圖片描述

  1. 根據topicqueueId來組織文件,圖中TopicA有兩個隊列0,1,那末TopicA和QueueId=0組成1個ConsumeQueue,TopicA和QueueId=1組成另外一個ConsumeQueue。
  2. 依照消費真個GroupName來分組重試隊列,如果消費端消費失敗,消息將被發往重試隊列中,比如圖中的%RETRY%ConsumerGroupA
    1. 依照消費真個GroupName來分組死信隊列,如果消費端消費失敗,并重試指定次數后,依然失敗,則發往死信隊列,比如圖中的%DLQ%ConsumerGroupA

死信隊列(Dead Letter Queue)1般用于寄存由于某種緣由沒法傳遞的消息,比如處理失敗或已過期的消息。

Consume Queue中存儲單元是1個20字節定長的2進制數據,順序寫順序讀,以下圖所示:

這里寫圖片描述

CommitLog Offset是指這條消息在Commit Log文件中的實際偏移量
Size存儲中消息的大小
Message Tag HashCode存儲消息的Tag的哈希值:主要用于定閱時消息過濾(定閱時如果指定了Tag,會根據HashCode來快速查找到定閱的消息)

2、Commit Log

CommitLog:消息寄存的物理文件,每臺broker上的commitlog被本機所有的queue同享,不做任何辨別。
文件的默許位置以下,依然可通過配置文件修改:

${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構以下表所示,依照編號順序和編號對應的內容順次存儲。

這里寫圖片描述

3、消息存儲實現

消息存儲實現,比較復雜,也值得大家深入了解,后面會單獨成文來分析,這小節只以代碼說明1下具體的流程。

// Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting msg.setBodyCRC(UtilAll.crc32(msg.getBody())); StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); synchronized (this) { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); // Here settings are stored timestamp, in order to ensure an orderly global msg.setStoreTimestamp(beginLockTimestamp); // MapedFile:操作物理文件在內存中的映照和將內存數據持久化到物理文件中 MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(); // 將Message追加到文件commitlog result = mapedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK:break; case END_OF_FILE: // Create a new file, re-write the message mapedFile = this.mapedFileQueue.getLastMapedFile(); result = mapedFile.appendMessage(msg, this.appendMessageCallback); break; DispatchRequest dispatchRequest = new DispatchRequest( topic,// 1 queueId,// 2 result.getWroteOffset(),// 3 result.getWroteBytes(),// 4 tagsCode,// 5 msg.getStoreTimestamp(),// 6 result.getLogicsOffset(),// 7 msg.getKeys(),// 8 /** * Transaction */ msg.getSysFlag(),// 9 msg.getPreparedTransactionOffset());// 10 // 1.分發消息位置到ConsumeQueue // 2.分發到IndexService建立索引 this.defaultMessageStore.putDispatchRequest(dispatchRequest); }

4、消息的索引文件

如果1個消息包括key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:
這里寫圖片描述

索引文件主要用于根據key來查詢消息的,流程主要是:

1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是1個索引文件里面包括的最大槽的數目,例如圖中所示 slotNum=5000000)
2. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最后1項(倒序排列,slotValue 總是指向最新的1個索引項)
3. 遍歷索引項列表返回查詢時間范圍內的結果集(默許1次最大返回的 32 條記錄)

6、消息定閱

RocketMQ消息定閱有兩種模式,1種是Push模式,即MQServer主動向消費端推送;另外1種是Pull模式,即消費端在需要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是采取消費端主動拉取的方式。

首先看下消費真個負載均衡:
這里寫圖片描述

消費端會通過RebalanceService線程,10秒鐘做1次基于topic下的所有隊列負載:

1. 遍歷Consumer下的所有topic,然后根據topic定閱所有的消息
2. 獲得同1topic和Consumer Group下的所有Consumer
3. 然后根據具體的分配策略來分配消費隊列,分配的策略包括:平均分配、消費端配置等

猶如上圖所示:如果有 5 個隊列,2 個 consumer,那末第1個 Consumer 消費 3 個隊列,第2 consumer 消費 2 個隊列。這里采取的就是平均分配策略,它類似于我們的分頁,TOPIC下面的所有queue就是記錄,Consumer的個數就相當于總的頁數,那末每頁有多少條記錄,就類似于某個Consumer會消費哪些隊列。

通過這樣的策略來到達大體上的平均消費,這樣的設計也能夠很方面的水平擴大Consumer來提高消費能力。

消費真個Push模式是通太長輪詢的模式來實現的,就猶如下圖:

這里寫圖片描述

Consumer端每隔1段時間主動向broker發送拉消息要求,broker在收到Pull要求后,如果有消息就立即返回數據,Consumer端收到返回的消息后,再回調消費者設置的Listener方法。如果broker在收到Pull要求時,消息隊列里沒有數據,broker端會阻塞要求直到有數據傳遞或超時才返回。

固然,Consumer端是通過1個線程將阻塞隊列LinkedBlockingQueue中的PullRequest發送到broker拉取消息,以避免Consumer1致被阻塞。而Broker端,在接收到Consumer的PullRequest時,如果發現沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來。broker在啟動時,會啟動1個線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數據返回。

7、RocketMQ的其他特性

前面的6個特性都是基本上都是點到為止,想要深入了解,還需要大家多多查看源碼,多多在實際中應用。固然除已提到的特性外,RocketMQ還支持:

1. 定時消息
2. 消息的刷盤策略
3. 主動同步策略:同步雙寫、異步復制
4. 海量消息堆積能力
5. 高效通訊 .
6. ……

其中觸及到的很多設計思路和解決方法都值得我們深入研究:

1. 消息的存儲設計:既要滿足海量消息的堆積能力,又要滿足極快的查詢效力,還要保證寫入的效力。
2. 高效的通訊組件設計:高吞吐量,毫秒級的消息投遞能力都離不開高效的通訊。
3. …….

RocketMQ最好實踐

1、Producer最好實踐

1、1個利用盡量用1個 Topic,消息子類型用 tags 來標識,tags 可以由利用自由設置。只有發送消息設置了tags,消費方在定閱消息時,才可以利用 tags 在 broker 做消息過濾。

2、每一個消息在業務層面的唯1標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。由因而哈希索引,請務必保證 key 盡量唯1,這樣可以免潛伏的哈希沖突。

3、消息發送成功或失敗,要打印消息日志,務必要打印 sendresult 和 key 字段。

4、對消息不可丟失利用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或人工觸發重發。

5、某些利用如果不關注消息是不是發送成功,請直接使用sendOneWay方法發送消息。

2、Consumer最好實踐

  1. 消費進程要做到冪等(即消費端去重)

  2. 盡可能使用批量方式消費方式,可以很大程度上提高消費吞吐量

  3. 優化每條消息消費進程

3、其他配置

線上應當關閉autoCreateTopicEnable,即在配置文件中將其設置為false。

RocketMQ在發送消息時,會首先獲得路由信息。如果是新的消息,由于MQServer上面還沒有創建對應的Topic,這個時候,如果上面的配置打開的話,會返回默許TOPIC的(RocketMQ會在每臺broker上面創建名為TBW102的TOPIC)路由信息,然后Producer會選擇1臺Broker發送消息,選中的broker在存儲消息時,發現消息的topic還沒有創建,就會自動創建topic。后果就是:以后所有該TOPIC的消息,都將發送到這臺broker上,達不到負載均衡的目的。

所以基于目前RocketMQ的設計,建議關閉自動創建TOPIC的功能,然后根據消息量的大小,手動創建TOPIC。

RocketMQ設計相干
RocketMQ的設計假定:

每臺PC機器都可能宕機不可服務

任意集群都有可能處理能力不足

最壞的情況1定會產生

內網環境需要低延遲來提供最好用戶體驗

RocketMQ的關鍵設計:

散布式集群化

強數據安全

海量數據堆積

毫秒級投遞延遲(推拉模式)

這是RocketMQ在設計時的假定條件和需要到達的效果。我想這些假定適用于所有的系統設計。隨著我們系統的服務的增多,每位開發者都要注意自己的程序是不是存在單點故障,如果掛了應當怎樣恢復、能不能很好的水平擴大、對外的接口是不是足夠高效、自己管理的數據是不是足夠安全…… 多多規范自己的設計,才能開發出高效硬朗的程序。

附錄:RocketMQ觸及到的幾個專業術語和整體架構介紹

1、RocketMQ中的專業術語

  • Topic

topic表示消息的第1級類型,比如1個電商系統的消息可以分為:交易消息、物流消息…… 1條消息必須有1個Topic。

  • Tag

Tag表示消息的第2級類型,比如交易消息又可以分為:交易創建消息,交易完成消息….. 1條消息可以沒有Tag。RocketMQ提供2級消息分類,方便大家靈活控制。

  • Queue

1個topic下,我們可以設置多個queue(消息隊列)。當我們發送消息時,需要要指定該消息的topic。RocketMQ會輪詢該topic下的所有隊列,將消息發送出去。

  • Producer 與 Producer Group

Producer表示消息隊列的生產者。消息隊列的本質就是實現了publish-subscribe模式,生產者生產消息,消費者消費消息。所以這里的Producer就是用來生產和發送消息的,1般指業務系統。

Producer Group是1類Producer的集合名稱,這類Producer通常發送1類消息,且發送邏輯1致。

  • Consumer 與 Consumer Group

消息消費者,1般由后臺系統異步消費消息。

  • Push Consumer

Consumer 的1種,利用通常向 Consumer 對象注冊1個 Listener 接口,1旦收到消息,Consumer 對象立刻回調 Listener 接口方法。

  • Pull Consumer

Consumer 的1種,利用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由利用控制。
Consumer Group是1類Consumer的集合名稱,這類Consumer通常消費1類消息,且消費邏輯1致。

  • Broker

消息的中轉者,負責存儲和轉發消息。可以理解為消息隊列服務器,提供了消息的接收、存儲、拉取和轉發服務。broker是RocketMQ的核心,它不不能掛的,所以需要保證broker的高可用。

  • 廣播消費

1條消息被多個Consumer消費,即便這些Consumer屬于同1個Consumer Group,消息也會被Consumer Group中的每一個Consumer都消費1次。在廣播消費中的Consumer Group概念可以認為在消息劃分方面無意義。

  • 集群消費

1個Consumer Group中的Consumer實例平均分攤消費消息。例如某個Topic有 9 條消息,其中1個Consumer Group有 3 個實例(多是 3 個進程,或 3 臺機器),那末每一個實例只消費其中的 3 條消息。

  • NameServer

NameServer即名稱服務,兩個功能:

接收broker的要求,注冊broker的路由信息
接口client的要求,根據某個topic獲得其到broker的路由信息
NameServer沒有狀態,可以橫向擴大。每一個broker在啟動的時候會到NameServer注冊;Producer在發送消息前會根據topic到NameServer獲得路由(到broker)信息;Consumer也會定時獲得topic路由信息。

2、RocketMQ Overview

這里寫圖片描述
Producer向1些隊列輪番發送消息,隊列集合稱為Topic,Consumer如果做廣播消費,則1個consumer實例消費這個Topic對應的所有隊列;如果做集群消費,則多個Consumer實例平均消費這個Topic對應的隊列集合。

再看下RocketMQ物理部署結構圖:
這里寫圖片描述

RocketMQ網絡部署特點:

Name Server 是1個幾近無狀態節點,可集群部署,節點之間無任何信息同步。
Broker部署相對復雜,Broker分為Master與Slave,1個Master可以對應多個Slave,但是1個Slave只能對應1個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId=0表示Master,非0表示Slave。Master也能夠部署多個。每一個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server。
Producer與Name Server集群中的其中1個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer 完全無狀態,可集群部署。
Consumer與Name Server集群中的其中1個節點(隨機選擇)建立長連接,定期從Name Server取Topic 路由信息,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master定閱消息,也能夠從Slave定閱消息,定閱規則由Broker配置決定。

3、其他參考資料

  1. RocketMQ用戶指南
  2. RocketMQ原理簡介
  3. RocketMQ最好實踐
  4. 阿里散布式開放消息服務(ONS)原理與實踐2
  5. 阿里散布式開放消息服務(ONS)原理與實踐3
  6. RocketMQ原理解析

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 能在线观看的一区二区三区 | 国产一区在线播放 | www.毛片.com| 成人久久免费视频 | 亚洲午夜久久久精品影院 | 日韩乱码中文字幕视频 | 国产一级做a爰片久久毛片99 | 亚洲一区影院 | 国产免费不卡v片在线观看 国产免费叼嘿视频 | 国产亚洲精品福利在线 | 欧美特级午夜一区二区三区 | 成人久久久久 | 免费福利在线观看 | 亚洲人人看| 国产一区二区三区精品视频 | 欧美另类精品xxxx人妖换性 | 亚洲a视频在线观看 | 国产亚洲欧美另类专区 | 国产美女福利视频福利 | 国产精品成人观看视频网站 | 婷婷夜夜躁天天躁人人躁 | 国产高清看片日韩欧美久久 | 国产三级视频在线 | 国产一区日韩二区欧美三区 | 免费看黄色网址 | 久久久www成人免费精品 | 99久久中文字幕伊人 | 欧美视频日韩专区午夜 | 日韩永久在线观看免费视频 | 亚洲欧美日韩中文字幕在线一区 | 午夜 在线播放 | www一级黄色片 | 日韩日韩日韩 | 国产在线日韩在线 | 午夜影院免费观看 | 亚洲免费黄色网址 | 免费午夜不卡毛片 | 亚洲最黄网站 | 黄色www.| a毛片免费播放全部完整 | 国产日产欧产精品精品推荐小说 |