多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 互聯網 > [置頂] pipeline-filter模式變體之尾循環

[置頂] pipeline-filter模式變體之尾循環

來源:程序員人生   發布時間:2014-11-09 09:39:28 閱讀次數:4234次

pipeline-filter作為1種處理數據的模式(見【POSA】卷4)可以將利用的任務劃分為幾個自我完備的數據處理步驟,并連接到1個數據管道。本文介紹1種不太常見的pipeline-filter的變體――尾循環的pipeline-filter,固然這也是在特定的需求場景下才會出現的。

首先,我們來看1個常見的pipeline-filter的模式圖:


模式的思路比較簡單明了,就是對數據的處理步驟進行拆分。然后以統1的編程接口加上遞歸的方式,將它們串在1起。

最近在寫的1個Message broker中處理消息通訊的時候也采取了這類模式來切分消息處理步驟。在發送消息的時候這類模式使用得非常的順暢,因此很自然得在接收消息的時候一樣采取了這類模式。我們可以先來簡單來看1個發送消息的時候全部pipeline就是像下面這樣:


關于名詞的說明:本文中談到的handler可以類比為filter,提到的handler-chain可以類比為pipeline,只是叫法不同。

如果不去多想,接收消息的pipeline也應當跟發送消息類似,只是produce-handler變成了consume-handler。但在代碼實現的時候,rabbitmq-java-client的實現方式使得這類模式的應用有些受阻。正常情況下,我們的理解是1個消息的處理或1個消息集合的處理,會穿過1個pipeline,但官方提供的java-client對接收消息的實現是socket-blocking以等待消息推送到client的(push)方式。由于官方client的這類實現方式,使得外部封裝的做法最好是將socket-blocking搬遷到1個獨立的EventLoop-Thread上,然后獲得到消息以后,解封送并以事件的方式對外提供消息,而客戶端在該事件中實現自己的處理邏輯便可,也就是說是1種異步接收的方式,仔細想一想也確切應當是這類push的方式。

因而可知在接收消息時的pipeline還是很不同于發送消息的。對接收消息而言,filter分成兩個部份,第1部份的多個filter只履行1次(主要是在真正開始接收消息之前,處理1些前置任務,比如權限檢查,參數驗證等);第2部份的多個filter卻要不斷得在另外1個EventLoop-Thread上循環履行(由于這些filter觸及到對接收到的message進行處理)。所以,在接收消息時的示意圖為:


其中,下面框起來的兩個handler是在EventLoop-Thread上循環履行的。

明顯,上面用于produce的那種pipeline-filter不能應對這類變化(既沒法跨線程也沒法在就其中的幾個進行循環)。而此時不可能獨立得為consume單獨實現1套新的pipeline-filter(由于在pipeline-filter的基礎設施上,我們已將produce,consume和request,response、publish,subscribe等都抽象為消息傳輸(carry))。因此,我們只能在同1套運行機制上,找到1種辦法來滿足所有的消息傳輸方式。

我們的做法是,實現1個“過渡handler”(此處的handler即為filter,只是取名不同而已),并實現handle方法。該handler用于將后續的邏輯過渡到1個獨立的EventLoop-Thread上并啟動EventLoop-Thread(把傳遞給當前handler的上下文和chain對象傳遞到事件處理線程上去),其后的所有handler都在該線程上循環履行。

其實現代碼段以下:

public void handle(@NotNull MessageContext context, @NotNull IHandlerChain chain) { if (!context.isSync()) { ReceiveEventLoop eventLoop = new ReceiveEventLoop(); eventLoop.setChain(chain); eventLoop.setContext(context); eventLoop.setChannelDestroyer(context.getDestroyer()); eventLoop.setCurrentConsumer((QueueingConsumer) context.getOtherParams().get("consumer")); context.setReceiveEventLoop(eventLoop); //repeat current handler ((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true); eventLoop.startEventLoop(); } else { chain.handle(context); } }

如上,ReceiveEventLoop即為1個獨立的EventLoop-Thread,啟動以后,對pipeline發起的線程而言,它啟動的本次調用鏈(handle方法的遞歸調用)已結束。因此主線程將會從該調用的觸發點向下繼續履行

而后續的filter在eventloop線程上獨立運行:

public void run() { try { while (true) { QueueingConsumer.Delivery delivery = this.currentConsumer.nextDelivery(); AMQP.BasicProperties properties = delivery.getProperties(); byte[] msgBody = delivery.getBody(); context.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false); //..... this.context.setConsumedMsg(msg); this.chain.handle(this.context); } } catch (InterruptedException e) { logger.info("[run] close the consumer's message handler!"); } catch (IOException e) { logger.error("[run] occurs a IOException : " + e.getMessage()); this.shutdown(); } catch (ConsumerCancelledException e) { logger.info("[run] the consumer has been canceled "); this.shutdown(); } catch (Exception e) { logger.error("[run] occurs a Exception : " + e.getMessage()); this.shutdown(); } logger.info("******** thread id " + this.getThreadID() + " quit from message receiver ********"); }

有1個問題:我們必須拿到對EventLoop-Thread的控制權(外部可以在任意時刻關閉該eventloop),而獲得其控制權的關鍵代碼就是上上段代碼中的:

context.setReceiveEventLoop(eventLoop);

我們將當前EventLoop的實例包裝成1個Thread類型的屬性,并為其開放了相應的開、關方法,將其控制權丟給外部:

public void startEventLoop() { this.currentThread.start(); } public void shutdown() { this.channelDestroyer.destroy(context.getChannel()); this.currentThread.interrupt(); }

然后在主線程發起接收消息的方法最后會返回1個IReceiverCloser接口的實例,在其接口方法close中調用shutdown,來對其進行關閉:

//launch pipeline carry(ctx); return new IReceiverCloser() { @Override public void close() { synchronized (this) { if (ctx.getReceiveEventLoop().isAlive()) { ctx.getReceiveEventLoop().shutdown(); } } } };

另外一個問題,handler-chain是如何知道從某個handler以后轉入eventloop線程會開始循環履行?是如何實現的?它來自于第1段代碼中的以下這句代碼:

((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true);
這句代碼會在進入下1個handler之前設置1個“循環履行”的標志。下面看看,我們是如何來改造handlerchain來到達這個目的的,在MessageCarryHandlerChain(它實現了IHandlerChain接口)的實現中,有以下4個變量:

private List<AbstractHandler> handlerChain; private int pos = 0; private boolean enableRepeat = false; private int repeatPos = ⑴;

  • handlerChain:解析并順序存儲每一個handler
  • pos:用來記錄當前已履行到哪一個handler
  • enableRepeat:用來標識是不是啟用重復履行handler
  • repeatPos:用于記錄重復履行的handler的起始位置
在設置啟用enableRepeat屬性的時候,會記錄當前的位置狀態:
public void setEnableRepeatBeforeNextHandler(boolean enableRepeat) { this.enableRepeat = enableRepeat; if (this.enableRepeat) { this.repeatPos = this.pos; } else { this.repeatPos = Integer.MIN_VALUE; } }

MessageCarryChain的handle實現,這也是handler串接的核心:
public void handle(MessageContext context) { if (this.repeatPos != Integer.MIN_VALUE) { if (this.pos < handlerChain.size()) { AbstractHandler currentHandler = handlerChain.get(pos++); currentHandler.handle(context, this); } else if (this.enableRepeat) { this.pos = this.repeatPos; } } }
在處理第1條到來的消息時,對應到上面while(true)中的最后1句:
this.chain.handle(this.context);

調用會進入MessageCarryChain的handle方法,并履行:
if (this.pos < handlerChain.size()) { }

判斷分支,在其中會觸發下1個handler的handle方法,并1直履行下去直到判斷條件不成立以后會履行else邏輯,將之前保存的起始循環的handler的位置置為新的handler的位置。
因而,當下1次,事件循環在上面while(true)中收到新的消息時,會再次履行上面的if判斷分支(由于在接收上1條消息時,已將pos重置了,所以If判斷條件又重新成立)并以循環位置的handler作為起始,直到走到handlerChain中的最后1個以后,又將當前位置的pos重置為repeatPos保存的位置(注意repeatPos在第1次被設置以后不再改變),如此循環往復。也就在表面上構成了上圖所示的尾循環。
為何說只是表面上構成了呢。由于在表述中,我故意“疏忽”了這樣1個現實――pipeline-filter模式根本上還是利用了遞歸來實現的,遞歸會有個還原點,用于“保護現場”以后再“還原現場”。因此,上面MessageCarryChain中的handle代碼段中的:
currentHandler.handle(context, this);

對每一個被履行的handler都是還原點,當第1輪handler履行完成(調用完這句:this.pos = this.repeatPos;)都會在還原點層層回退(履行還原點以后的該方法內部的剩余代碼)。因此,在收到第2個消息時,實際上是觸發了新1輪的handler-chain履行流程,只是由于pos在之前被置為chain中的循環起始位置,而不是從0開始而已。但對后面尾循環的handler而言,它們遞歸調用的本質沒有改變,所以其實只是看起來在尾部“循環”而已。
說明:其實如果你回顧pipeline-filter模式的本質,它們是用來處理數據的。我這里不論是發送還是接收消息統籌處理了消息以外的1些邏輯。如果這里只處理消息,實際上是可以不用跨線程和尾循環的。我只是利用了這類模式,將消息通訊的各個環節進行拆分,組合,重用從而不可躲避得遇到了這個問題,如果回到純潔的pipeline-filter模式,是不需要這么做的。
如果我沒表述清楚的,請直接看代碼:Messagebus-Consume
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 一二三四视频在线观看免费高清 | 视频国产一区 | 性欧美成人免费观看视 | 亚洲一区 中文字幕 久久 | 国产福利片在线 | jlzzjlzz亚洲大全 | 国产国产成人精品久久 | 亚洲国产成人在线观看 | 亚洲欧美片 | 92精品国产自产在线观看48页 | 欧美一区二 | 亚洲天堂网站在线 | 美女免费观看一区二区三区 | 97av在线视频 | 国产一区精品视频 | 欧美日韩一区二区三区麻豆 | h视频免费 | 日韩欧美一区二区三区视频 | 国产亚洲综合精品一区二区三区 | 日本一道本中文字幕 | 国产精品永久免费视频观看 | 尤物tv | 日本在线不卡免费视频一区 | 黑人群性xxx | 成人欧美一区二区三区黑人 | 免费观看视频网站 | 色综合欧美综合天天综合 | 日韩日韩日韩日韩日韩 | 最近伦中文字幕高清字幕mv | 美女福利视频一区二区 | 国产成人亚洲精品久久 | 伊人中文| 日本高清不卡免费 | 欧美艳星性videose精品 | 国产成人久久精品激情91 | 欧美一区二区三区久久综 | 亚洲精品国产成人99久久 | 噜噜影院无毒不卡 | 免费在线观看黄色网址 | 国产精品久久毛片蜜月 | 欧美色成人tv在线播放 |