RocketMQ 網(wǎng)絡(luò)部署特點
Name Server 是1個幾近無狀態(tài)節(jié)點,可集群部署,節(jié)點之間無任何信息同步。
Broker 部署相對復(fù)雜,Broker 分為Master 與Slave,1個Master 可以對應(yīng)多個Slave,但是1個Slave > 只能對應(yīng)1個Master,Master 與Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId來定> 義,BrokerId為0 表示Master,非0 表示Slave。Master 也能夠部署多個。每一個Broker 與Name
Producer 與Name Server 集群中的其中1個節(jié)點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master 建立長連接,且定時向Master 發(fā)送心跳。Producer 完全無> 狀態(tài),可集群部署。
Consumer 與Name Server 集群中的其中1個節(jié)點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master、Slave 建立長連接,且定時向Master、Slave 發(fā)送心跳。Consumer既可以從Master 定閱消息,也能夠從Slave 定閱消息,定閱規(guī)則由Broker 配置決定。
Broker集群部署方式主要有以下幾種:(Slave 不可寫,但可讀)
這類方式風(fēng)險較大,1旦Broker 重啟或宕機時,會致使全部服務(wù)不可用,不建議線上環(huán)境使用。
1個集群無 Slave,全是 Master,例如 2 個 Master 或 3 個 Master。
優(yōu)點:配置簡單,單個Master 宕機或重啟保護對利用無影響,在磁盤配置為 RAID10 時,即便機器宕機不可恢復(fù)情況下,由于RAID10 磁盤非常可靠,消息也不會丟(異步刷盤丟失少許消息,同步刷盤1條不丟)。性能最高。
缺點:單臺機器宕機期間,這臺機器上未被消費的消息在機器恢復(fù)之前不可定閱,消息實時性會遭到遭到影響。
先啟動 NameServer,例如機器 IP 為:192.168.1.101:9876
nohup sh mqnamesrv &
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
每一個 Master 配置1個 Slave,有多對Master-Slave,HA 采取異步復(fù)制方式,主備有短暫消息延遲,毫秒級。
優(yōu)點:即便磁盤破壞,消息丟失的非常少,且消息實時性不會受影響,由于 Master 宕機后,消費者依然可以從 Slave 消費,此進程對利用透明。不需要人工干預(yù)。性能同多 Master 模式幾近1樣。
缺點:Master宕機,磁盤破壞情況,會丟失少許消息。
先啟動兩臺服務(wù)器的NameServer,例如機器 IP 為:192.168.1.101:9876 和192.168.1.102:9876
nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log 2>$ROCKETMQ_HOME/log/ng-error.log &
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties >$ROCKETMQ_HOME/log/mq.log &
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties >$ROCKETMQ_HOME/log/mq.log &
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log &
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log &
每一個 Master 配置1個 Slave,有多對Master-Slave,HA 采取同步雙寫方式,主備都寫成功,向利用返回成功。
優(yōu)點:數(shù)據(jù)與服務(wù)都無單點,Master宕機情況下,消息無延遲,服務(wù)可用性與數(shù)據(jù)可用性都非常高
缺點:性能比異步復(fù)制模式略低,大約低10%左右,發(fā)送單個消息的 RT 會略高。目前主宕機后,備機不能自動切換為主機,后續(xù)會支持自動切換功能。
先啟動兩臺服務(wù)器的NameServer,例如機器 IP 為:192.168.1.101:9876 和192.168.1.102:9876
nohup sh mqnamesrv 1>$ROCKETMQ_HOME/log/ng.log 2>$ROCKETMQ_HOME/log/ng-error.log &
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties >$ROCKETMQ_HOME/log/mq.log &
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties >$ROCKETMQ_HOME/log/mq.log &
nohup sh mqbroker -n 192.168.1.101:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log &
nohup sh mqbroker -n 192.168.1.102:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log &
以上 Broker 與 Slave 配對是通過指定相同的brokerName 參數(shù)來配對,Master 的 BrokerId 必須是
0,Slave 的BrokerId 必須是大與 0 的數(shù)。另外1個 Master 下面可以掛載多個 Slave,同1 Master 下的多個
Slave 通過指定不同的 BrokerId 來辨別。
下面以配置1主1備(同步),2個nameserver為例測試。
同時在2臺機器個啟動1個nameserver。安裝RocketMq請參考:
http://blog.csdn.net/zhu_tianwei/article/details/40948447
mkdir /usr/local/alibaba-rocketmq/log #創(chuàng)建日志目錄
mkdir -p /usr/local/alibaba-rocketmq/data/store/commitlog #創(chuàng)建數(shù)據(jù)存儲目錄
更改日志目錄
cd /usr/local/alibaba-rocketmq/conf
sed -i 's#${user.home}#${user.home}/alibaba-rocketmq#g' *.xml
vi ./conf/2m-2s-sync/broker-a.properties
#Broker所屬哪一個集群,默許【DefaultCluster】
brokerClusterName=DefaultCluster
#本機主機名
brokerName=broker-a
#BrokerId,必須是大等于0的整數(shù),0表示Master,>0表示Slave,1個Master可以掛多個Slave,Master與Slave通過BrokerName來配對,默許【0】
brokerId=0
#刪除文件時間點,默許清晨4點
deleteWhen=04
#文件保存時間,默許48小時
fileReservedTime=48
#Broker的角色 - ASYNC_MASTER 異步復(fù)制Master - SYNC_MASTER 同步雙寫Master - SLAVE
brokerRole=SYNC_MASTER
#刷盤方式 - ASYNC_FLUSH 異步刷盤 - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#Name Server地址
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
#Broker對外服務(wù)的監(jiān)聽端口,默許【10911】
listenPort=10911
defaultTopicQueueNums=4
#是不是允許Broker自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉,默許【true】
autoCreateTopicEnable=true
#是不是允許Broker自動創(chuàng)建定閱組,建議線下開啟,線上關(guān)閉,默許【true】
autoCreateSubscriptionGroup=true
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/alibaba-rocketmq/data/store
storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
vi ./conf/2m-2s-sync/broker-a-s.properties
#Broker所屬哪一個集群,默許【DefaultCluster】
brokerClusterName=DefaultCluster
#本機主機名
brokerName=broker-a
#BrokerId,必須是大等于0的整數(shù),0表示Master,>0表示Slave,1個Master可以掛多個Slave,Master與Slave通過BrokerName來配對,默許【0】
brokerId=1
#刪除文件時間點,默許清晨4點
deleteWhen=04
#文件保存時間,默許48小時
fileReservedTime=48
#Broker的角色 - ASYNC_MASTER 異步復(fù)制Master - SYNC_MASTER 同步雙寫Master - SLAVE
brokerRole=SLAVE
#刷盤方式 - ASYNC_FLUSH 異步刷盤 - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#Name Server地址
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
#Broker對外服務(wù)的監(jiān)聽端口,默許【10911】
listenPort=10911
defaultTopicQueueNums=4
#是不是允許Broker自動創(chuàng)建Topic,建議線下開啟,線上關(guān)閉,默許【true】
autoCreateTopicEnable=true
#是不是允許Broker自動創(chuàng)建定閱組,建議線下開啟,線上關(guān)閉,默許【true】
autoCreateSubscriptionGroup=true
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=50000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/alibaba-rocketmq/data/store
storePathCommitLog=/usr/local/alibaba-rocketmq/data/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
package com.somnus.rocketmq;
import java.util.concurrent.TimeUnit;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/**
* 1個利用創(chuàng)建1個Producer,由利用來保護此對象,可以設(shè)置為全局對象或單例<br>
* 注意:ProducerGroupName需要由利用來保證唯1,1類Producer集合的名稱,這類Producer通常發(fā)送1類消息,
* 且發(fā)送邏輯1致<br>
* ProducerGroup這個概念發(fā)送普通的消息時,作用不大,但是發(fā)送散布式事務(wù)消息時,比較關(guān)鍵,
* 由于服務(wù)器會回查這個Group下的任意1個Producer
*/
final TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName");
// nameserver服務(wù)
producer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");
producer.setInstanceName("Producer");
/**
* Producer對象在使用之前必須要調(diào)用start初始化,初始化1次便可<br>
* 注意:切記不可以在每次發(fā)送消息時,都調(diào)用start方法
*/
producer.start();
// 服務(wù)器回調(diào)Producer,檢查本地事務(wù)分支成功還是失敗
producer.setTransactionCheckListener(new TransactionCheckListener() {
public LocalTransactionState checkLocalTransactionState(
MessageExt msg) {
System.out.println("checkLocalTransactionState --" + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
/**
* 下面這段代碼表明1個Producer對象可以發(fā)送多個topic,多個tag的消息。
* 注意:send方法是同步調(diào)用,只要不拋異常就標識成功。但是發(fā)送成功也可會有多種狀態(tài),<br>
* 例如消息寫入Master成功,但是Slave不成功,這類情況消息屬于成功,但是對個別利用如果對消息可靠性要求極高,<br>
* 需要對這類情況做處理。另外,消息可能會存在發(fā)送失敗的情況,失敗重試由利用來處理。
*/
for (int i = 0; i < 10; i++) {
try {
{
Message msg = new Message("TopicTest1", // topic
"TagA", // tag
"OrderID001", // key消息關(guān)鍵詞,多個Key用KEY_SEPARATOR隔開(查詢消息使用)
("Hello MetaQA").getBytes()); // body
SendResult sendResult = producer.sendMessageInTransaction(
msg, new LocalTransactionExecuter(){
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
System.out.println("executeLocalTransactionBranch--arg=" + arg);
return LocalTransactionState.COMMIT_MESSAGE;
}
},
"$$$");
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2", // topic
"TagB", // tag
"OrderID0034", // key 消息關(guān)鍵詞,多個Key用KEY_SEPARATOR隔開(查詢消息使用)
("Hello MetaQB").getBytes()); // body
SendResult sendResult = producer.sendMessageInTransaction(
msg, new LocalTransactionExecuter(){
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
System.out.println("executeLocalTransactionBranch--arg=" + arg);
return LocalTransactionState.COMMIT_MESSAGE;
}
},
"$$$");
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3", // topic
"TagC", // tag
"OrderID061", // key
("Hello MetaQC").getBytes()); // body
SendResult sendResult = producer.sendMessageInTransaction(
msg, new LocalTransactionExecuter(){
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
System.out.println("executeLocalTransactionBranch--arg=" + arg);
return LocalTransactionState.COMMIT_MESSAGE;
}
},
"$$$");
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(1000);
}
/**
* 利用退出時,要調(diào)用shutdown來清算資源,關(guān)閉網(wǎng)絡(luò)連接,從MetaQ服務(wù)器上注銷自己
* 注意:我們建議利用在JBOSS、Tomcat等容器的退出鉤子里調(diào)用shutdown方法
*/
// producer.shutdown();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
System.exit(0);
} // 履行本地事務(wù),由客戶端回調(diào)
}
package com.somnus.rocketmq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class Consumer {
// Java緩存
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
/**
* 主動拉取方式消費
*
* @throws MQClientException
*/
public static void main(String[] args) throws MQClientException {
/**
* 1個利用創(chuàng)建1個Consumer,由利用來保護此對象,可以設(shè)置為全局對象或單例<br>
* 注意:ConsumerGroupName需要由利用來保證唯1 ,最好使用服務(wù)的包名辨別同1服務(wù),1類Consumer集合的名稱,
* 這類Consumer通常消費1類消息,且消費邏輯1致
* PullConsumer:Consumer的1種,利用通常主動調(diào)用Consumer的拉取消息方法從Broker拉消息,主動權(quán)由利用控制
*/
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName");
// //nameserver服務(wù)
consumer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");
consumer.setInstanceName("Consumber");
consumer.start();
// 拉取定閱主題的隊列,默許隊列大小是4
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
List<MessageExt> list = pullResult.getMsgFoundList();
if (list != null && list.size() < 100) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null) {
System.out.println(offset);
return offset;
}
return 0;
}
}