ActiveMQ源碼解析(四):聊聊消息的可靠傳輸機制和事務(wù)控制
來源:程序員人生 發(fā)布時間:2016-06-30 08:57:02 閱讀次數(shù):4562次
在消息傳遞的進程中,某些情況下比如網(wǎng)絡(luò)閃斷、丟包等會致使消息永久性丟失,這時候消費者是接收不到消息的,這樣就會造成數(shù)據(jù)不1致的問題。那末我們怎樣才能保證消息1定能發(fā)送給消費者呢?怎樣才能避免數(shù)據(jù)不1致呢?又比如我們發(fā)送多條消息,有時候我們期望都發(fā)送成功但實際上其中1部份發(fā)送成功,另外一部份發(fā)送失敗了,沒到達(dá)我們的預(yù)期效果,那末我們怎樣解決這個問題呢?
前1種問題我們通過消息確認(rèn)機制來解決,它分為幾種模式,需要在創(chuàng)建session時指定是不是開啟事務(wù)和確認(rèn)模式,像下面這樣:
<span style="font-size:12px;">ActiveMQSession session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);</span>
然后我們來看在PubSubscribe模式下消息的全部從發(fā)送到消費確認(rèn)的流程來了解消息的確認(rèn)機制和事務(wù)。首先看看producer怎樣發(fā)送消息的:
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
//檢查session狀態(tài),如果session已關(guān)閉則拋出狀態(tài)異常
checkClosed();
//檢查destination類型,如果不符合要求就轉(zhuǎn)變成ActiveMQDestination
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
//把各種不同的message轉(zhuǎn)換成ActiveMQMessage
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//發(fā)送消息到broker中的topic
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
//消息計數(shù)
stats.onMessage();
}
我們以ActiveMQSession的send為例再來看看session是怎樣發(fā)送消息的:
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
//檢查session狀態(tài)如果closed拋出狀態(tài)異常
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
//競爭鎖(互斥信號量),如果1個session的多個producer發(fā)送消息這里會保證有序性
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
//告知broker開始1個新事務(wù),只有session的確認(rèn)模式是SESSION_TRANSACTED時事務(wù)上下網(wǎng)才會開啟事務(wù)
doStartTransaction();
//從事務(wù)上下文中獲得事務(wù)id
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
//在jms協(xié)議頭中設(shè)置傳輸模式即消息是不是需要持久化
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
//檢查producer中的message是不是過期
if (!producer.getDisableMessageTimestamp()) {
//message獲得時間戳
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
//設(shè)置過期時間
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
//設(shè)置消息過期時間
message.setJMSExpiration(expiration);
//設(shè)置消息優(yōu)先級
message.setJMSPriority(priority);
//設(shè)置消息是非重發(fā)的
message.setJMSRedelivered(false);
// transform to our own message format here
//將消息轉(zhuǎn)化成ActiveMQMessage,message針對不同的數(shù)據(jù)格式有很多種,比如map message,blob message
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
//設(shè)置目的地,這里是1個topic
msg.setDestination(destination);
//設(shè)置消息id
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
//如果消息是經(jīng)過轉(zhuǎn)換的,那末原消息更新新的id
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
//設(shè)置目的地
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
//清除brokerpath
msg.setBrokerPath(null);
//設(shè)置事務(wù)id
msg.setTransactionId(txid);
//
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
//設(shè)置連接器
msg.setConnection(connection);
//把消息的屬性和消息體都設(shè)置為只讀,避免被修改
msg.onSend();
//生產(chǎn)者id
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
//如果onComplete沒設(shè)置,且發(fā)送超時時間小于0,且消息不需要反饋,且連接器不是同步發(fā)送模式,且(消息非持久化或連接器是異步發(fā)送模式或存在事務(wù)id的情況下)異步發(fā)送,否則同步發(fā)送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
//異步發(fā)送走transport的oneway通道
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
//同步發(fā)送走transport的request和asyncrequest通道
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
這樣消息就被發(fā)送到broker的topic中了,接下來broker中會根據(jù)topic下的subscriber的id找出定閱者,并向這些消費者發(fā)送消息,消費者接收到消息后會消費消息,我們接下來看看消費者怎樣消費消息的。
下面是ActiveMQConsumer和ActiveMQSession中的方法,session沒創(chuàng)建1個consumer便可能會重啟session線程,session線程的run中會調(diào)用message的listener中的onMessage方法
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
//停止session線程
if (wasRunning) {
session.stop();
}
this.messageListener.set(listener);
//session重新分發(fā)未消費的message
session.redispatch(this, unconsumedMessages);
//開啟session線程
if (wasRunning) {
session.start();
}
} else {
this.messageListener.set(null);
}
}
public void run() {
MessageDispatch messageDispatch;
while ((messageDispatch = executor.dequeueNoWait()) != null) {
final MessageDispatch md = messageDispatch;
final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
MessageAck earlyAck = null;
//如果消息過期創(chuàng)建新的確認(rèn)消息
if (message.isExpired()) {
earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
earlyAck.setFirstMessageId(message.getMessageId());
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
}
//如果消息已過期,或消息有沖突則發(fā)送確認(rèn)消息重新開始while循環(huán)
if (earlyAck != null) {
try {
asyncSendPacket(earlyAck);
} catch (Throwable t) {
LOG.error("error dispatching ack: {} ", earlyAck, t);
connection.onClientInternalException(t);
} finally {
continue;
}
}
//如果是確認(rèn)模式是CLIENT_ACKNOWLEDGE或INDIVIDUAL_ACKONWLEDGE則設(shè)置空回調(diào)函數(shù),這樣consumer確認(rèn)消息后會履行回調(diào)函數(shù)
if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
@Override
public void execute() throws Exception {
}
});
}
//在發(fā)送前調(diào)用途理函數(shù)
if (deliveryListener != null) {
deliveryListener.beforeDelivery(this, message);
}
//設(shè)置delivery id
md.setDeliverySequenceId(getNextDeliveryId());
lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
/*
* The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
* We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
* */
synchronized (redeliveryGuard) {
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
//如果是事務(wù)模式則開啟事務(wù)
doStartTransaction();
ack.setTransactionId(getTransactionContext().getTransactionId());
if (ack.getTransactionId() != null) {
//事務(wù)狀態(tài)下添加1個匿名同步器,用于處理同步事務(wù)比如回滾
getTransactionContext().addSynchronization(new Synchronization() {
final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
@Override
public void beforeEnd() throws Exception {
// validate our consumer so we don't push stale acks that get ignored
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
}
LOG.trace("beforeEnd ack {}", ack);
sendAck(ack);
}
@Override
public void afterRollback() throws Exception {
LOG.trace("rollback {}", ack, new Throwable("here"));
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
// don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
if (clearRequestsCounter.get() > clearRequestCount) {
LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
// validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
return;
}
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get
// sent to the
// DLQ.
// Acknowledge the last message.
MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
asyncSendPacket(ack);
} else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
// Figure out how long we should wait to resend
// this message.
long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
/*
* If we are a non blocking delivery then we need to stop the executor to avoid more
* messages being delivered, once the message is redelivered we can restart it.
* */
if (!connection.isNonBlockingRedelivery()) {
LOG.debug("Blocking session until re-delivery...");
executor.stop();
}
connection.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
/*
* wait for the first delivery to be complete, i.e. after delivery has been called.
* */
synchronized (redeliveryGuard) {
/*
* If its non blocking then we can just dispatch in a new session.
* */
if (connection.isNonBlockingRedelivery()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
/*
* If there has been an error thrown during afterDelivery then the
* endpoint will be marked as dead so redelivery will fail (and eventually
* the session marked as stale), in this case we can only call dispatch
* which will create a new session with a new endpoint.
* */
if (afterDeliveryError.get()) {
((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
} else {
executor.executeFirst(md);
executor.start();
}
}
}
}
}, redeliveryDelay);
}
md.getMessage().onMessageRolledBack();
}
});
}
LOG.trace("{} onMessage({})", this, message.getMessageId());
//觸發(fā)消息事件監(jiān)聽函數(shù)
messageListener.onMessage(message);
} catch (Throwable e) {
LOG.error("error dispatching message: ", e);
// A problem while invoking the MessageListener does not
// in general indicate a problem with the connection to the broker, i.e.
// it will usually be sufficient to let the afterDelivery() method either
// commit or roll back in order to deal with the exception.
// However, we notify any registered client internal exception listener
// of the problem.
connection.onClientInternalException(e);
} finally {
//發(fā)送確認(rèn)消息
if (ack.getTransactionId() == null) {
try {
asyncSendPacket(ack);
} catch (Throwable e) {
connection.onClientInternalException(e);
}
}
}
//觸發(fā)投遞事件監(jiān)聽函數(shù)
if (deliveryListener != null) {
try {
deliveryListener.afterDelivery(this, message);
} catch (Throwable t) {
LOG.debug("Unable to call after delivery", t);
afterDeliveryError.set(true);
throw new RuntimeException(t);
}
}
}
/*
* this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
* It also needs to be outside the redelivery guard.
* */
try {
executor.waitForQueueRestart();
} catch (InterruptedException ex) {
connection.onClientInternalException(ex);
}
}
}
總結(jié)
消息確認(rèn)機制
消費者和broker通訊終究實現(xiàn)消息確認(rèn),消息確認(rèn)機制1共有5種,4種jms的和1種activemq補充的,AUTO_ACKNOWLEDGE(自動確認(rèn))、CLIENT_ACKNOWLEDGE(客戶確認(rèn))、DUPS_OK_ACKNOWLEDGE(批量確認(rèn))、SESSION_TRANSACTED(事務(wù)確認(rèn))、INDIVIDUAL_ACKNOWLEDGE(單條確認(rèn)),consumer在不同的模式下會發(fā)不同的命令到broker,broker再根據(jù)不同的命令進行操作,如果consumer正常發(fā)送ack命令給broker,broker會從topic移除消息并燒毀,如果未從消費者接遭到確認(rèn)命令,broker會將消息轉(zhuǎn)移到dlq隊列(dead
letter queue),并根據(jù)delivery mode進行重試或報異常。
消息事務(wù)
消息事務(wù)是在生產(chǎn)者producer到broker或broker到consumer進程中同1個session中產(chǎn)生的,保證幾條消息在發(fā)送進程中的原子性。可以在connection的createSession方法中指定1個布爾值開啟,如果消息確認(rèn)機制是事務(wù)確認(rèn),那末在發(fā)送message的進程中session就會開啟事務(wù)(實際上broker的),不用用戶顯示調(diào)用 beginTransaction,這時候所有通過session發(fā)送的消息都被緩存下來,用戶調(diào)用session.commit時會發(fā)送所有消息,當(dāng)發(fā)送出現(xiàn)異常時用戶可以調(diào)用rollback進行回滾操作,只有在開啟事務(wù)狀態(tài)下有效。
最后附上1張他人畫的activemq消息處理的流轉(zhuǎn)圖。
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機掃描二維碼進行捐贈