[置頂] pipeline-filter模式變體之尾循環
來源:程序員人生 發布時間:2014-11-09 09:39:28 閱讀次數:4234次
pipeline-filter作為1種處理數據的模式(見【POSA】卷4)可以將利用的任務劃分為幾個自我完備的數據處理步驟,并連接到1個數據管道。本文介紹1種不太常見的pipeline-filter的變體――尾循環的pipeline-filter,固然這也是在特定的需求場景下才會出現的。
首先,我們來看1個常見的pipeline-filter的模式圖:

模式的思路比較簡單明了,就是對數據的處理步驟進行拆分。然后以統1的編程接口加上遞歸的方式,將它們串在1起。
最近在寫的1個Message broker中處理消息通訊的時候也采取了這類模式來切分消息處理步驟。在發送消息的時候這類模式使用得非常的順暢,因此很自然得在接收消息的時候一樣采取了這類模式。我們可以先來簡單來看1個發送消息的時候全部pipeline就是像下面這樣:

關于名詞的說明:本文中談到的handler可以類比為filter,提到的handler-chain可以類比為pipeline,只是叫法不同。
如果不去多想,接收消息的pipeline也應當跟發送消息類似,只是produce-handler變成了consume-handler。但在代碼實現的時候,rabbitmq-java-client的實現方式使得這類模式的應用有些受阻。正常情況下,我們的理解是1個消息的處理或1個消息集合的處理,會穿過1個pipeline,但官方提供的java-client對接收消息的實現是socket-blocking以等待消息推送到client的(push)方式。由于官方client的這類實現方式,使得外部封裝的做法最好是將socket-blocking搬遷到1個獨立的EventLoop-Thread上,然后獲得到消息以后,解封送并以事件的方式對外提供消息,而客戶端在該事件中實現自己的處理邏輯便可,也就是說是1種異步接收的方式,仔細想一想也確切應當是這類push的方式。
因而可知在接收消息時的pipeline還是很不同于發送消息的。對接收消息而言,filter分成兩個部份,第1部份的多個filter只履行1次(主要是在真正開始接收消息之前,處理1些前置任務,比如權限檢查,參數驗證等);第2部份的多個filter卻要不斷得在另外1個EventLoop-Thread上循環履行(由于這些filter觸及到對接收到的message進行處理)。所以,在接收消息時的示意圖為:

其中,下面框起來的兩個handler是在EventLoop-Thread上循環履行的。
明顯,上面用于produce的那種pipeline-filter不能應對這類變化(既沒法跨線程也沒法在就其中的幾個進行循環)。而此時不可能獨立得為consume單獨實現1套新的pipeline-filter(由于在pipeline-filter的基礎設施上,我們已將produce,consume和request,response、publish,subscribe等都抽象為消息傳輸(carry))。因此,我們只能在同1套運行機制上,找到1種辦法來滿足所有的消息傳輸方式。
我們的做法是,實現1個“過渡handler”(此處的handler即為filter,只是取名不同而已),并實現handle方法。該handler用于將后續的邏輯過渡到1個獨立的EventLoop-Thread上并啟動EventLoop-Thread(把傳遞給當前handler的上下文和chain對象傳遞到事件處理線程上去),其后的所有handler都在該線程上循環履行。
其實現代碼段以下:
public void handle(@NotNull MessageContext context,
@NotNull IHandlerChain chain) {
if (!context.isSync()) {
ReceiveEventLoop eventLoop = new ReceiveEventLoop();
eventLoop.setChain(chain);
eventLoop.setContext(context);
eventLoop.setChannelDestroyer(context.getDestroyer());
eventLoop.setCurrentConsumer((QueueingConsumer) context.getOtherParams().get("consumer"));
context.setReceiveEventLoop(eventLoop);
//repeat current handler
((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true);
eventLoop.startEventLoop();
} else {
chain.handle(context);
}
}
如上,ReceiveEventLoop即為1個獨立的EventLoop-Thread,啟動以后,對pipeline發起的線程而言,它啟動的本次調用鏈(handle方法的遞歸調用)已結束。因此主線程將會從該調用的觸發點向下繼續履行
而后續的filter在eventloop線程上獨立運行:
public void run() {
try {
while (true) {
QueueingConsumer.Delivery delivery = this.currentConsumer.nextDelivery();
AMQP.BasicProperties properties = delivery.getProperties();
byte[] msgBody = delivery.getBody();
context.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//.....
this.context.setConsumedMsg(msg);
this.chain.handle(this.context);
}
} catch (InterruptedException e) {
logger.info("[run] close the consumer's message handler!");
} catch (IOException e) {
logger.error("[run] occurs a IOException : " + e.getMessage());
this.shutdown();
} catch (ConsumerCancelledException e) {
logger.info("[run] the consumer has been canceled ");
this.shutdown();
} catch (Exception e) {
logger.error("[run] occurs a Exception : " + e.getMessage());
this.shutdown();
}
logger.info("******** thread id " + this.getThreadID() + " quit from message receiver ********");
}
有1個問題:我們必須拿到對EventLoop-Thread的控制權(外部可以在任意時刻關閉該eventloop),而獲得其控制權的關鍵代碼就是上上段代碼中的:
context.setReceiveEventLoop(eventLoop);
我們將當前EventLoop的實例包裝成1個Thread類型的屬性,并為其開放了相應的開、關方法,將其控制權丟給外部:
public void startEventLoop() {
this.currentThread.start();
}
public void shutdown() {
this.channelDestroyer.destroy(context.getChannel());
this.currentThread.interrupt();
}
然后在主線程發起接收消息的方法最后會返回1個IReceiverCloser接口的實例,在其接口方法close中調用shutdown,來對其進行關閉:
//launch pipeline
carry(ctx);
return new IReceiverCloser() {
@Override
public void close() {
synchronized (this) {
if (ctx.getReceiveEventLoop().isAlive()) {
ctx.getReceiveEventLoop().shutdown();
}
}
}
};
另外一個問題,handler-chain是如何知道從某個handler以后轉入eventloop線程會開始循環履行?是如何實現的?它來自于第1段代碼中的以下這句代碼:
((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true);
這句代碼會在進入下1個handler之前設置1個“循環履行”的標志。下面看看,我們是如何來改造handlerchain來到達這個目的的,在MessageCarryHandlerChain(它實現了IHandlerChain接口)的實現中,有以下4個變量:
private List<AbstractHandler> handlerChain;
private int pos = 0;
private boolean enableRepeat = false;
private int repeatPos = ⑴;
- handlerChain:解析并順序存儲每一個handler
- pos:用來記錄當前已履行到哪一個handler
- enableRepeat:用來標識是不是啟用重復履行handler
- repeatPos:用于記錄重復履行的handler的起始位置
在設置啟用enableRepeat屬性的時候,會記錄當前的位置狀態:
public void setEnableRepeatBeforeNextHandler(boolean enableRepeat) {
this.enableRepeat = enableRepeat;
if (this.enableRepeat) {
this.repeatPos = this.pos;
} else {
this.repeatPos = Integer.MIN_VALUE;
}
}
MessageCarryChain的handle實現,這也是handler串接的核心:
public void handle(MessageContext context) {
if (this.repeatPos != Integer.MIN_VALUE) {
if (this.pos < handlerChain.size()) {
AbstractHandler currentHandler = handlerChain.get(pos++);
currentHandler.handle(context, this);
} else if (this.enableRepeat) {
this.pos = this.repeatPos;
}
}
}
在處理第1條到來的消息時,對應到上面while(true)中的最后1句:
this.chain.handle(this.context);
調用會進入MessageCarryChain的handle方法,并履行:
if (this.pos < handlerChain.size()) {
}
判斷分支,在其中會觸發下1個handler的handle方法,并1直履行下去直到判斷條件不成立以后會履行else邏輯,將之前保存的起始循環的handler的位置置為新的handler的位置。
因而,當下1次,事件循環在上面while(true)中收到新的消息時,會再次履行上面的if判斷分支(由于在接收上1條消息時,已將pos重置了,所以If判斷條件又重新成立)并以循環位置的handler作為起始,直到走到handlerChain中的最后1個以后,又將當前位置的pos重置為repeatPos保存的位置(注意repeatPos在第1次被設置以后不再改變),如此循環往復。也就在表面上構成了上圖所示的尾循環。
為何說只是表面上構成了呢。由于在表述中,我故意“疏忽”了這樣1個現實――pipeline-filter模式根本上還是利用了遞歸來實現的,遞歸會有個還原點,用于“保護現場”以后再“還原現場”。因此,上面MessageCarryChain中的handle代碼段中的:
currentHandler.handle(context, this);
對每一個被履行的handler都是還原點,當第1輪handler履行完成(調用完這句:this.pos = this.repeatPos;)都會在還原點層層回退(履行還原點以后的該方法內部的剩余代碼)。因此,在收到第2個消息時,實際上是觸發了新1輪的handler-chain履行流程,只是由于pos在之前被置為chain中的循環起始位置,而不是從0開始而已。但對后面尾循環的handler而言,它們遞歸調用的本質沒有改變,所以其實只是看起來在尾部“循環”而已。
說明:其實如果你回顧pipeline-filter模式的本質,它們是用來處理數據的。我這里不論是發送還是接收消息統籌處理了消息以外的1些邏輯。如果這里只處理消息,實際上是可以不用跨線程和尾循環的。我只是利用了這類模式,將消息通訊的各個環節進行拆分,組合,重用從而不可躲避得遇到了這個問題,如果回到純潔的pipeline-filter模式,是不需要這么做的。
如果我沒表述清楚的,請直接看代碼:Messagebus-Consume
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈