多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內(nèi)最全IT社區(qū)平臺 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > php開源 > php教程 > ActiveMQ源碼解析(四):聊聊消息的可靠傳輸機制和事務(wù)控制

ActiveMQ源碼解析(四):聊聊消息的可靠傳輸機制和事務(wù)控制

來源:程序員人生   發(fā)布時間:2016-06-30 08:57:02 閱讀次數(shù):4562次

在消息傳遞的進程中,某些情況下比如網(wǎng)絡(luò)閃斷、丟包等會致使消息永久性丟失,這時候消費者是接收不到消息的,這樣就會造成數(shù)據(jù)不1致的問題。那末我們怎樣才能保證消息1定能發(fā)送給消費者呢?怎樣才能避免數(shù)據(jù)不1致呢?又比如我們發(fā)送多條消息,有時候我們期望都發(fā)送成功但實際上其中1部份發(fā)送成功,另外一部份發(fā)送失敗了,沒到達(dá)我們的預(yù)期效果,那末我們怎樣解決這個問題呢?

前1種問題我們通過消息確認(rèn)機制來解決,它分為幾種模式,需要在創(chuàng)建session時指定是不是開啟事務(wù)和確認(rèn)模式,像下面這樣:

<span style="font-size:12px;">ActiveMQSession session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);</span>
        然后我們來看在PubSubscribe模式下消息的全部從發(fā)送到消費確認(rèn)的流程來了解消息的確認(rèn)機制和事務(wù)。首先看看producer怎樣發(fā)送消息的:

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { //檢查session狀態(tài),如果session已關(guān)閉則拋出狀態(tài)異常 checkClosed(); //檢查destination類型,如果不符合要求就轉(zhuǎn)變成ActiveMQDestination if (destination == null) { if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } ActiveMQDestination dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); } if (transformer != null) { //把各種不同的message轉(zhuǎn)換成ActiveMQMessage Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } } if (producerWindow != null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } } //發(fā)送消息到broker中的topic this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); //消息計數(shù) stats.onMessage(); }

        我們以ActiveMQSession的send為例再來看看session是怎樣發(fā)送消息的:

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { //檢查session狀態(tài)如果closed拋出狀態(tài)異常 checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } //競爭鎖(互斥信號量),如果1個session的多個producer發(fā)送消息這里會保證有序性 synchronized (sendMutex) { // tell the Broker we are about to start a new transaction //告知broker開始1個新事務(wù),只有session的確認(rèn)模式是SESSION_TRANSACTED時事務(wù)上下網(wǎng)才會開啟事務(wù) doStartTransaction(); //從事務(wù)上下文中獲得事務(wù)id TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 //在jms協(xié)議頭中設(shè)置傳輸模式即消息是不是需要持久化 message.setJMSDeliveryMode(deliveryMode); long expiration = 0L; //檢查producer中的message是不是過期 if (!producer.getDisableMessageTimestamp()) { //message獲得時間戳 long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); //設(shè)置過期時間 if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } //設(shè)置消息過期時間 message.setJMSExpiration(expiration); //設(shè)置消息優(yōu)先級 message.setJMSPriority(priority); //設(shè)置消息是非重發(fā)的 message.setJMSRedelivered(false); // transform to our own message format here //將消息轉(zhuǎn)化成ActiveMQMessage,message針對不同的數(shù)據(jù)格式有很多種,比如map message,blob message ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); //設(shè)置目的地,這里是1個topic msg.setDestination(destination); //設(shè)置消息id msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); // Set the message id. //如果消息是經(jīng)過轉(zhuǎn)換的,那末原消息更新新的id if (msg != message) { message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. //設(shè)置目的地 message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message //清除brokerpath msg.setBrokerPath(null); //設(shè)置事務(wù)id msg.setTransactionId(txid); // if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } //設(shè)置連接器 msg.setConnection(connection); //把消息的屬性和消息體都設(shè)置為只讀,避免被修改 msg.onSend(); //生產(chǎn)者id msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } //如果onComplete沒設(shè)置,且發(fā)送超時時間小于0,且消息不需要反饋,且連接器不是同步發(fā)送模式,且(消息非持久化或連接器是異步發(fā)送模式或存在事務(wù)id的情況下)異步發(fā)送,否則同步發(fā)送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { //異步發(fā)送走transport的oneway通道 this.connection.asyncSendPacket(msg); if (producerWindow != null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize(); producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { //同步發(fā)送走transport的request和asyncrequest通道 this.connection.syncSendPacket(msg,sendTimeout); }else { this.connection.syncSendPacket(msg, onComplete); } } } }

       這樣消息就被發(fā)送到broker的topic中了,接下來broker中會根據(jù)topic下的subscriber的id找出定閱者,并向這些消費者發(fā)送消息,消費者接收到消息后會消費消息,我們接下來看看消費者怎樣消費消息的。

      下面是ActiveMQConsumer和ActiveMQSession中的方法,session沒創(chuàng)建1個consumer便可能會重啟session線程,session線程的run中會調(diào)用message的listener中的onMessage方法       

public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); //停止session線程 if (wasRunning) { session.stop(); } this.messageListener.set(listener); //session重新分發(fā)未消費的message session.redispatch(this, unconsumedMessages); //開啟session線程 if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
public void run() { MessageDispatch messageDispatch; while ((messageDispatch = executor.dequeueNoWait()) != null) { final MessageDispatch md = messageDispatch; final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); MessageAck earlyAck = null; //如果消息過期創(chuàng)建新的確認(rèn)消息 if (message.isExpired()) { earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); earlyAck.setFirstMessageId(message.getMessageId()); } else if (connection.isDuplicate(ActiveMQSession.this, message)) { LOG.debug("{} got duplicate: {}", this, message.getMessageId()); earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); earlyAck.setFirstMessageId(md.getMessage().getMessageId()); earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); } //如果消息已過期,或消息有沖突則發(fā)送確認(rèn)消息重新開始while循環(huán) if (earlyAck != null) { try { asyncSendPacket(earlyAck); } catch (Throwable t) { LOG.error("error dispatching ack: {} ", earlyAck, t); connection.onClientInternalException(t); } finally { continue; } } //如果是確認(rèn)模式是CLIENT_ACKNOWLEDGE或INDIVIDUAL_ACKONWLEDGE則設(shè)置空回調(diào)函數(shù),這樣consumer確認(rèn)消息后會履行回調(diào)函數(shù) if (isClientAcknowledge()||isIndividualAcknowledge()) { message.setAcknowledgeCallback(new Callback() { @Override public void execute() throws Exception { } }); } //在發(fā)送前調(diào)用途理函數(shù) if (deliveryListener != null) { deliveryListener.beforeDelivery(this, message); } //設(shè)置delivery id md.setDeliverySequenceId(getNextDeliveryId()); lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); /* * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. * */ synchronized (redeliveryGuard) { try { ack.setFirstMessageId(md.getMessage().getMessageId()); //如果是事務(wù)模式則開啟事務(wù) doStartTransaction(); ack.setTransactionId(getTransactionContext().getTransactionId()); if (ack.getTransactionId() != null) { //事務(wù)狀態(tài)下添加1個匿名同步器,用于處理同步事務(wù)比如回滾 getTransactionContext().addSynchronization(new Synchronization() { final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); @Override public void beforeEnd() throws Exception { // validate our consumer so we don't push stale acks that get ignored if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); } LOG.trace("beforeEnd ack {}", ack); sendAck(ack); } @Override public void afterRollback() throws Exception { LOG.trace("rollback {}", ack, new Throwable("here")); // ensure we don't filter this as a duplicate connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect if (clearRequestsCounter.get() > clearRequestCount) { LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); return; } // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); return; } RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); int redeliveryCounter = md.getMessage().getRedeliveryCounter(); if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { // We need to NACK the messages so that they get // sent to the // DLQ. // Acknowledge the last message. MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); asyncSendPacket(ack); } else { MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); asyncSendPacket(ack); // Figure out how long we should wait to resend // this message. long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); for (int i = 0; i < redeliveryCounter; i++) { redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); } /* * If we are a non blocking delivery then we need to stop the executor to avoid more * messages being delivered, once the message is redelivered we can restart it. * */ if (!connection.isNonBlockingRedelivery()) { LOG.debug("Blocking session until re-delivery..."); executor.stop(); } connection.getScheduler().executeAfterDelay(new Runnable() { @Override public void run() { /* * wait for the first delivery to be complete, i.e. after delivery has been called. * */ synchronized (redeliveryGuard) { /* * If its non blocking then we can just dispatch in a new session. * */ if (connection.isNonBlockingRedelivery()) { ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); } else { /* * If there has been an error thrown during afterDelivery then the * endpoint will be marked as dead so redelivery will fail (and eventually * the session marked as stale), in this case we can only call dispatch * which will create a new session with a new endpoint. * */ if (afterDeliveryError.get()) { ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); } else { executor.executeFirst(md); executor.start(); } } } } }, redeliveryDelay); } md.getMessage().onMessageRolledBack(); } }); } LOG.trace("{} onMessage({})", this, message.getMessageId()); //觸發(fā)消息事件監(jiān)聽函數(shù) messageListener.onMessage(message); } catch (Throwable e) { LOG.error("error dispatching message: ", e); // A problem while invoking the MessageListener does not // in general indicate a problem with the connection to the broker, i.e. // it will usually be sufficient to let the afterDelivery() method either // commit or roll back in order to deal with the exception. // However, we notify any registered client internal exception listener // of the problem. connection.onClientInternalException(e); } finally { //發(fā)送確認(rèn)消息 if (ack.getTransactionId() == null) { try { asyncSendPacket(ack); } catch (Throwable e) { connection.onClientInternalException(e); } } } //觸發(fā)投遞事件監(jiān)聽函數(shù) if (deliveryListener != null) { try { deliveryListener.afterDelivery(this, message); } catch (Throwable t) { LOG.debug("Unable to call after delivery", t); afterDeliveryError.set(true); throw new RuntimeException(t); } } } /* * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. * It also needs to be outside the redelivery guard. * */ try { executor.waitForQueueRestart(); } catch (InterruptedException ex) { connection.onClientInternalException(ex); } } }

總結(jié)

      消息確認(rèn)機制

      消費者和broker通訊終究實現(xiàn)消息確認(rèn),消息確認(rèn)機制1共有5種,4種jms的和1種activemq補充的,AUTO_ACKNOWLEDGE(自動確認(rèn))、CLIENT_ACKNOWLEDGE(客戶確認(rèn))、DUPS_OK_ACKNOWLEDGE(批量確認(rèn))、SESSION_TRANSACTED(事務(wù)確認(rèn))、INDIVIDUAL_ACKNOWLEDGE(單條確認(rèn)),consumer在不同的模式下會發(fā)不同的命令到broker,broker再根據(jù)不同的命令進行操作,如果consumer正常發(fā)送ack命令給broker,broker會從topic移除消息并燒毀,如果未從消費者接遭到確認(rèn)命令,broker會將消息轉(zhuǎn)移到dlq隊列(dead letter queue),并根據(jù)delivery mode進行重試或報異常。

       消息事務(wù)

       消息事務(wù)是在生產(chǎn)者producer到broker或broker到consumer進程中同1個session中產(chǎn)生的,保證幾條消息在發(fā)送進程中的原子性。可以在connection的createSession方法中指定1個布爾值開啟,如果消息確認(rèn)機制是事務(wù)確認(rèn),那末在發(fā)送message的進程中session就會開啟事務(wù)(實際上broker的),不用用戶顯示調(diào)用 beginTransaction,這時候所有通過session發(fā)送的消息都被緩存下來,用戶調(diào)用session.commit時會發(fā)送所有消息,當(dāng)發(fā)送出現(xiàn)異常時用戶可以調(diào)用rollback進行回滾操作,只有在開啟事務(wù)狀態(tài)下有效。  最后附上1張他人畫的activemq消息處理的流轉(zhuǎn)圖。
  


生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 手机在线观看视频 | 亚洲精品老司机在线观看 | 噜噜影院在线视频在线观看 | 亚洲第一中文字幕 | 中文字幕yellow在线资源 | 久久国产精品高清一区二区三区 | 曰韩一级 | 国产精品视频成人 | 波多野结衣在线一区二区 | 永久在线毛片免费观看 | 波多野结衣中文字幕一区二区三区 | 欧美午夜精品一区二区三区 | 午夜dj免费视频观看在线播放 | 激情视频在线观看 | 精品国产乱码一区二区三区 | 欧美午夜在线播放 | 色自拍| 精品国产亚洲一区二区三区 | 亚洲天堂一区二区三区 | 午夜欧美精品久久久久久久 | 久久亚洲美女久久久久 | 成人午夜影视全部免费看 | 亚洲精品二区 | 99久久精品毛片免费播放 | 久久久欧美综合久久久久 | 成人免费视频视频在线不卡 | www.91在线视频| 性鸥美 | 日韩亚洲欧美日本精品va | 国产图片区 | 欧美爱爱爽爽视频在线观看 | 最近手机中文字幕大全8 | 国产精品一区二区三区四区五区 | 亚洲精品在线播放 | 中文字幕在线观看网站 | 久久精品免看国产 | 狠狠操网址 | 欧美成人三级伦在线观看 | 久久久久久综合一区中文字幕 | 最近免费中文在线视频 | 成人午夜又粗又硬有大 |