多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內(nèi)最全IT社區(qū)平臺 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > php框架 > 框架設(shè)計 > RocketMQ(二)集群配置

RocketMQ(二)集群配置

來源:程序員人生   發(fā)布時間:2016-07-02 13:12:06 閱讀次數(shù):6332次

RocketMQ網(wǎng)絡(luò)部署圖

這里寫圖片描述

RocketMQ 網(wǎng)絡(luò)部署特點

  • Name Server 是1個幾近無狀態(tài)節(jié)點,可集群部署,節(jié)點之間無任何信息同步。

  • Broker 部署相對復(fù)雜,Broker 分為Master 與Slave,1個Master 可以對應(yīng)多個Slave,但是1個Slave > 只能對應(yīng)1個Master,Master 與Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId來定> 義,BrokerId為0 表示Master,非0 表示Slave。Master 也能夠部署多個。每一個Broker 與Name

  • Producer 與Name Server 集群中的其中1個節(jié)點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master 建立長連接,且定時向Master 發(fā)送心跳。Producer 完全無> 狀態(tài),可集群部署。

  • Consumer 與Name Server 集群中的其中1個節(jié)點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master、Slave 建立長連接,且定時向Master、Slave 發(fā)送心跳。Consumer既可以從Master 定閱消息,也能夠從Slave 定閱消息,定閱規(guī)則由Broker 配置決定。

Broker集群部署方式主要有以下幾種:(Slave 不可寫,但可讀)

單個Master

這類方式風(fēng)險較大,1旦Broker 重啟或宕機時,會致使全部服務(wù)不可用,不建議線上環(huán)境使用。


多Master模式

1個集群無 Slave,全是 Master,例如 2 個 Master 或 3 個 Master。

優(yōu)點:配置簡單,單個Master 宕機或重啟保護對利用無影響,在磁盤配置為 RAID10 時,即便機器宕機不可恢復(fù)情況下,由于RAID10 磁盤非常可靠,消息也不會丟(異步刷盤丟失少許消息,同步刷盤1條不丟)。性能最高。

缺點:單臺機器宕機期間,這臺機器上未被消費的消息在機器恢復(fù)之前不可定閱,消息實時性會遭到遭到影響。

先啟動 NameServer,例如機器 IP 為:192.168.1.101:9876

nohup sh mqnamesrv &
  • 在機器 A,啟動第1個 Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
  • 在機器 B,啟動第2個 Master
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &

多Master多Slave模式,異步復(fù)制

每一個 Master 配置1個 Slave,有多對Master-Slave,HA 采取異步復(fù)制方式,主備有短暫消息延遲,毫秒級。

優(yōu)點:即便磁盤破壞,消息丟失的非常少,且消息實時性不會受影響,由于 Master 宕機后,消費者依然可以從 Slave 消費,此進程對利用透明。不需要人工干預(yù)。性能同多 Master 模式幾近1樣。

缺點:Master宕機,磁盤破壞情況,會丟失少許消息。

先啟動兩臺服務(wù)器的NameServer,例如機器 IP 為:192.168.1.101:9876 和192.168.1.102:9876

nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log 2>$ROCKETMQ_HOME/log/ng-error.log &
  • 在機器 A,啟動第1個 Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機器 B,啟動第2個 Master
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機器 C,啟動第1個 Slave
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機器 D,啟動第2個 Slave
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log &

多Master多Slave模式,同步雙寫

每一個 Master 配置1個 Slave,有多對Master-Slave,HA 采取同步雙寫方式,主備都寫成功,向利用返回成功。

優(yōu)點:數(shù)據(jù)與服務(wù)都無單點,Master宕機情況下,消息無延遲,服務(wù)可用性與數(shù)據(jù)可用性都非常高

缺點:性能比異步復(fù)制模式略低,大約低10%左右,發(fā)送單個消息的 RT 會略高。目前主宕機后,備機不能自動切換為主機,后續(xù)會支持自動切換功能。

先啟動兩臺服務(wù)器的NameServer,例如機器 IP 為:192.168.1.101:9876 和192.168.1.102:9876

nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log 2>$ROCKETMQ_HOME/log/ng-error.log &
  • 在機器 A,啟動第1個 Master
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機器 B,啟動第2個 Master
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機器 C,啟動第1個 Slave
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log &
  • 在機器 D,啟動第2個 Slave
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log &

以上 Broker 與 Slave 配對是通過指定相同的brokerName 參數(shù)來配對,Master 的 BrokerId 必須是
0,Slave 的BrokerId 必須是大與 0 的數(shù)。另外1個 Master 下面可以掛載多個 Slave,同1 Master 下的多個
Slave 通過指定不同的 BrokerId 來辨別。

除此以外,nameserver也需要集群。

下面以配置1主1備(同步),2個nameserver為例測試。

1、環(huán)境兩臺機器:

  • 192.168.36.101 為主
  • 192.168.36.102 為備

同時在2臺機器個啟動1個nameserver。安裝RocketMq請參考:
http://blog.csdn.net/zhu_tianwei/article/details/40948447

2、修改配置

(1)創(chuàng)建目錄

mkdir /usr/local/alibaba-rocketmq/log #創(chuàng)建日志目錄 mkdir -p /usr/local/alibaba-rocketmq/data/store/commitlog #創(chuàng)建數(shù)據(jù)存儲目錄

更改日志目錄

cd /usr/local/alibaba-rocketmq/conf sed -i 's#${user.home}#${user.home}/alibaba-rocketmq#g' *.xml

(2)修改主配置

vi ./conf/2m-2s-sync/broker-a.properties
#Broker所屬哪一個集群,默許【DefaultCluster】 brokerClusterName=DefaultCluster #本機主機名 brokerName=broker-a #BrokerId,必須是大等于0的整數(shù),0表示Master,>0表示Slave,1個Master可以掛多個Slave,Master與Slave通過BrokerName來配對,默許【0】 brokerId=0 #刪除文件時間點,默許清晨4點 deleteWhen=04 #文件保存時間,默許48小時 fileReservedTime=48 #Broker的角色 - ASYNC_MASTER 異步復(fù)制Master - SYNC_MASTER 同步雙寫Master - SLAVE brokerRole=SYNC_MASTER #刷盤方式 - ASYNC_FLUSH 異步刷盤 - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #Name Server地址 namesrvAddr=192.168.1.101:9876;192.168.1.102:9876 #Broker對外服務(wù)的監(jiān)聽端口,默許【10911】 listenPort=10911 defaultTopicQueueNums=4 #是不是允許Broker自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateTopicEnable=true #是不是允許Broker自動創(chuàng)建定閱組,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateSubscriptionGroup=true mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 diskMaxUsedSpaceRatio=88 storePathRootDir=/usr/local/alibaba-rocketmq/data/store storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128

(3)修改備配置

vi ./conf/2m-2s-sync/broker-a-s.properties
#Broker所屬哪一個集群,默許【DefaultCluster】 brokerClusterName=DefaultCluster #本機主機名 brokerName=broker-a #BrokerId,必須是大等于0的整數(shù),0表示Master,>0表示Slave,1個Master可以掛多個Slave,Master與Slave通過BrokerName來配對,默許【0】 brokerId=1 #刪除文件時間點,默許清晨4點 deleteWhen=04 #文件保存時間,默許48小時 fileReservedTime=48 #Broker的角色 - ASYNC_MASTER 異步復(fù)制Master - SYNC_MASTER 同步雙寫Master - SLAVE brokerRole=SLAVE #刷盤方式 - ASYNC_FLUSH 異步刷盤 - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #Name Server地址 namesrvAddr=192.168.1.101:9876;192.168.1.102:9876 #Broker對外服務(wù)的監(jiān)聽端口,默許【10911】 listenPort=10911 defaultTopicQueueNums=4 #是不是允許Broker自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateTopicEnable=true #是不是允許Broker自動創(chuàng)建定閱組,建議線下開啟,線上關(guān)閉,默許【true】 autoCreateSubscriptionGroup=true mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 diskMaxUsedSpaceRatio=88 storePathRootDir=/usr/local/alibaba-rocketmq/data/store storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128

實例:

1.生產(chǎn)者Producer.java ,TransactionMQProducer使用

package com.somnus.rocketmq; import java.util.concurrent.TimeUnit; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 1個利用創(chuàng)建1個Producer,由利用來保護此對象,可以設(shè)置為全局對象或單例<br> * 注意:ProducerGroupName需要由利用來保證唯1,1類Producer集合的名稱,這類Producer通常發(fā)送1類消息, * 且發(fā)送邏輯1致<br> * ProducerGroup這個概念發(fā)送普通的消息時,作用不大,但是發(fā)送散布式事務(wù)消息時,比較關(guān)鍵, * 由于服務(wù)器會回查這個Group下的任意1個Producer */ final TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName"); // nameserver服務(wù) producer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876"); producer.setInstanceName("Producer"); /** * Producer對象在使用之前必須要調(diào)用start初始化,初始化1次便可<br> * 注意:切記不可以在每次發(fā)送消息時,都調(diào)用start方法 */ producer.start(); // 服務(wù)器回調(diào)Producer,檢查本地事務(wù)分支成功還是失敗 producer.setTransactionCheckListener(new TransactionCheckListener() { public LocalTransactionState checkLocalTransactionState( MessageExt msg) { System.out.println("checkLocalTransactionState --" + new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; } }); /** * 下面這段代碼表明1個Producer對象可以發(fā)送多個topic,多個tag的消息。 * 注意:send方法是同步調(diào)用,只要不拋異常就標識成功。但是發(fā)送成功也可會有多種狀態(tài),<br> * 例如消息寫入Master成功,但是Slave不成功,這類情況消息屬于成功,但是對個別利用如果對消息可靠性要求極高,<br> * 需要對這類情況做處理。另外,消息可能會存在發(fā)送失敗的情況,失敗重試由利用來處理。 */ for (int i = 0; i < 10; i++) { try { { Message msg = new Message("TopicTest1", // topic "TagA", // tag "OrderID001", // key消息關(guān)鍵詞,多個Key用KEY_SEPARATOR隔開(查詢消息使用) ("Hello MetaQA").getBytes()); // body SendResult sendResult = producer.sendMessageInTransaction( msg, new LocalTransactionExecuter(){ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody())); System.out.println("executeLocalTransactionBranch--arg=" + arg); return LocalTransactionState.COMMIT_MESSAGE; } }, "$$$"); System.out.println(sendResult); } { Message msg = new Message("TopicTest2", // topic "TagB", // tag "OrderID0034", // key 消息關(guān)鍵詞,多個Key用KEY_SEPARATOR隔開(查詢消息使用) ("Hello MetaQB").getBytes()); // body SendResult sendResult = producer.sendMessageInTransaction( msg, new LocalTransactionExecuter(){ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody())); System.out.println("executeLocalTransactionBranch--arg=" + arg); return LocalTransactionState.COMMIT_MESSAGE; } }, "$$$"); System.out.println(sendResult); } { Message msg = new Message("TopicTest3", // topic "TagC", // tag "OrderID061", // key ("Hello MetaQC").getBytes()); // body SendResult sendResult = producer.sendMessageInTransaction( msg, new LocalTransactionExecuter(){ public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody())); System.out.println("executeLocalTransactionBranch--arg=" + arg); return LocalTransactionState.COMMIT_MESSAGE; } }, "$$$"); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 利用退出時,要調(diào)用shutdown來清算資源,關(guān)閉網(wǎng)絡(luò)連接,從MetaQ服務(wù)器上注銷自己 * 注意:我們建議利用在JBOSS、Tomcat等容器的退出鉤子里調(diào)用shutdown方法 */ // producer.shutdown(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { producer.shutdown(); } })); System.exit(0); } // 履行本地事務(wù),由客戶端回調(diào) }

2、消費者Consumer.java ,采取主動拉取方式消費。

package com.somnus.rocketmq; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; public class Consumer { // Java緩存 private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); /** * 主動拉取方式消費 * * @throws MQClientException */ public static void main(String[] args) throws MQClientException { /** * 1個利用創(chuàng)建1個Consumer,由利用來保護此對象,可以設(shè)置為全局對象或單例<br> * 注意:ConsumerGroupName需要由利用來保證唯1 ,最好使用服務(wù)的包名辨別同1服務(wù),1類Consumer集合的名稱, * 這類Consumer通常消費1類消息,且消費邏輯1致 * PullConsumer:Consumer的1種,利用通常主動調(diào)用Consumer的拉取消息方法從Broker拉消息,主動權(quán)由利用控制 */ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName"); // //nameserver服務(wù) consumer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876"); consumer.setInstanceName("Consumber"); consumer.start(); // 拉取定閱主題的隊列,默許隊列大小是4 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); List<MessageExt> list = pullResult.getMsgFoundList(); if (list != null && list.size() < 100) { for (MessageExt msg : list) { System.out.println(new String(msg.getBody())); } } System.out.println(pullResult.getNextBeginOffset()); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) { System.out.println(offset); return offset; } return 0; } }
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 羞羞视频网站免费入口 | 图片区小说区校园 | 国产不卡的一区二区三区四区 | 亚洲欧美韩国日本 | 欧美色碰碰碰免费观看长视频 | 欧美肥老太肥50 60 70 | 天堂最新版www在线观看 | 午夜国产精品福利在线观看 | 国产美女激情 | 国产 | 久而欧洲野花视频欧洲1 | 亚洲久久网站 | 日本高清www午夜视频 | 亚洲精品综合一区二区三区在线 | 欧美一区二区视频 | 国产精品嫩草影院在线看 | 成人性a激情免费视频 | 精品一区二区三区五区六区 | 岛国性视频播放免费视频 | 国产欧美精品一区二区三区四区 | yellow日本| 小毛片网站 | 英国一级毛片 | 亚洲欧美视频一级 | 毛片最新网址 | 久久久久久久一精品 | 中文无码日韩欧免费视频 | 中文字幕在线观看一区二区三区 | 三级全黄在线观看www桃花 | free性丰满hd性欧美厨房 | 国产亚洲一区二区在线观看 | 国产亚洲精品久久久久久无 | a免费国产一级特黄aa大 | 日韩特黄特色大片免费视频 | 看亚洲人配人配人种jizz | 在线观看中文字幕亚洲 | 日一区二区 | 一区二区三区四区在线播放 | 国产精品久久99 | 午夜久久久久久久 | 性欧美18xx| 精品福利|