==========================
(接上文《架構設計:系統間通訊(36)——Apache Camel快速入門(上)》)
(補上文:Endpoint重要的漏講內容)
Endpoint Direct用于在兩個編排好的路由間實現Exchange消息的連接,上1個路由中由最后1個元素處理完的Exchange對象,將被發送至由Direct連接的下1個路由起始位置(http://camel.apache.org/direct.html)。注意,兩個被連接的路由1定要是可用的,并且存在于同1個Camel服務中。以下的例子說明了Endpoint Direct的簡單使用方式。
package com.yinwenjie.test.cameltest.helloworld;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.ModelCamelContext;
/**
* 測試兩個路由的連接
* @author yinwenjie
*/
public class DirectCamel {
public static void main(String[] args) throws Exception {
// 這是camel上下文對象,全部路由的驅動全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 啟動route
camelContext.start();
// 首先將兩個完全有效的路由注冊到Camel服務中
camelContext.addRoutes((new DirectCamel()).new DirectRouteA());
camelContext.addRoutes((new DirectCamel()).new DirectRouteB());
// 通用沒有具體業務意義的代碼,只是為了保證主線程不退出
synchronized (DirectCamel.class) {
DirectCamel.class.wait();
}
}
/**
* DirectRouteA 其中使用direct 連接到 DirectRouteB
* @author yinwenjie
*/
public class DirectRouteA extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
from("jetty:http://0.0.0.0:8282/directCamel")
// 連接路由:DirectRouteB
.to("direct:directRouteB")
.to("log:DirectRouteA?showExchangeId=true");
}
}
/**
* @author yinwenjie
*/
public class DirectRouteB extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
from("direct:directRouteB")
.to("log:DirectRouteB?showExchangeId=true");
}
}
}
以上代碼片斷中,我們編排了兩個可用的路由(雖然兩個路由都很簡單,但確切是兩個獨立的路由)命名為DirectRouteA和DirectRouteB。其中DirectRouteA實例在最后1個Endpoint控制端點(direct:directRouteB)中使用Endpoint Direct將Exchange消息發送到DirectRouteB實例的開始位置。以下是控制臺輸出的內容:
[2016-06-26 09:54:38] INFO qtp231573738-21 Exchange[Id: ID-yinwenjie-240-54473-1466906074572-0-1, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache, Body: [Body is instance of org.apache.camel.StreamCache]] (MarkerIgnoringBase.java:96)
[2016-06-26 09:54:38] INFO qtp231573738-21 Exchange[Id: ID-yinwenjie-240-54473-1466906074572-0-1, ExchangePattern: InOut, BodyType: org.apache.camel.converter.stream.InputStreamCache, Body: [Body is instance of org.apache.camel.StreamCache]] (MarkerIgnoringBase.java:96)
從以上履行效果我們可以看到,被連接的兩個路由使用的Exchange對象是同1個,也就是說在DirectRouteB路由中如果Exchange對象中的內容產生了變化就會在隨后繼續履行的DirectRouteA路由中產生影響。Endpoint Direct元素在我們實際使用Camel進行路由編排時,利用頻度非常高。由于它可以把多個已編排好的路由依照業務要求連接起來,構成1個新的路由,保持原有路由的良好重用。
========================================(增補完)
Camel中另外一個重要的元素是Processor處理器,它用于接收從控制端點、路由選擇條件又或另外一個處理器的Exchange中傳來的消息信息,并進行處理。Camel核心包和各個Plugin組件都提供了很多Processor的實現,開發人員也能夠通過實現org.apache.camel.Processor接口自定義處理器(后者是通常做法)。
既然是做編碼,那末我們自然可以在自定義的Processor處理器中做很多事情。這些事情可能包括處理業務邏輯、建立數據庫連接去做業務數據存儲、建立和某個第3方業務系統的RPC連接,但是我們1般不會那樣做——那是Endpoint的工作。Processor處理器中最主要的工作是進行業務數據格式的轉換和中間數據的臨時存儲。這樣做是由于Processor處理器是Camel編排的路由中,主要進行Exchange輸入輸出消息交換的地方。
不過開發人員固然可以在Processor處理器中連接數據庫。例如開發人員需要根據上1個Endpoint中攜帶的“定單編號前綴”信息,在Processor處理器中連接到1個獨立的數據庫中(或緩存服務中)查找其對應的路由信息,以便動態決定下1個路由路徑。由于Camel支持和JAVA語言的Spring框架無縫集成,所以要在Processor處理器中操作數據庫只需要進行非常簡單的配置。
以下代碼片斷是自定義的Processor處理器實現,其中的process(Exchange exchange)方法是必須進行實現的:
// 1個自定義處理器的實現
// 就是我們上文看到過的處理器實現了
public class OtherProcessor implements Processor {
......
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
//===============
// 您可以在這里進行數據格式轉換
// 并且將結果存儲到out message中
//===============
// 存入到exchange的out區域
if(exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || other out");
}
}
......
}
注意,處理器Processor是和控制端點平級的概念。要看1個URI對應的實現是不是是1個控制端點,最根本的就是看這個實現類是不是實現了org.apache.camel.Endpoint接口;而要看1個路由中的元素是不是是Processor處理器,最根本的就是看這個類是不是實現了org.apache.camel.Processor接口。
在控制端點和處理器之間、處理器和處理器之間,Camel允許開發人員進行路由條件設置。例如開發人員可以具有當Exchange In Message的內容為A的情況下將消息送入下1個處理器A,當Exchange In Message的內容為B時將消息送入下1個處理器B的處理能力。又例如,不管編排的路由中上1個元素的處理消息如何,都將攜帶消息的Exchange對象復制 多份,分別送入下1處理器X、Y、Z。開發人員乃至還可以通過路由規則完成Exchange到多個Endpoint的負載傳輸。
Camel中支持的路由規則非常豐富,包括:Message Filter、Based Router、Dynamic Router、Splitter、Aggregator、Resequencer等等。在Camel的官方文檔中使用了非常形象化的圖形來表示這些路由功能(http://camel.apache.org/enterprise-integration-patterns.html):
實際上EIP規則中所描寫的大部份業務集成模式,在以上頁面都能找到對應的圖形化表達。但由于篇幅和本專題的中心思想限制,恕筆者不能對Camel中的路由規則逐1講授。這里我們選取兩個比較有代表性的路由規則進行講授:Content Based Router和Recipient List。希望對各位讀者理解Camel中的路由規則有所幫助:
把Content Based Router譯為基于內容的路由,筆者覺得更加貼切(其實不能譯作基本路由,實際上你沒法定義甚么是基本路由)。它其實不是1種單1的路由方式,而是多種基于條件和判斷表達式的路由方式。其中可能包括choice語句/方法、when語句/方法、otherwise語句/方法。請看以下示例:
package com.yinwenjie.test.cameltest.helloworld;
......
/**
* 使用條件選擇進行路由編排
* @author yinwenjie
*/
public class ChoiceCamel extends RouteBuilder {
public static void main(String[] args) throws Exception {
// 這是camel上下文對象,全部路由的驅動全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 啟動route
camelContext.start();
// 將我們編排的1個完全消息路由進程,加入到上下文中
camelContext.addRoutes(new ChoiceCamel());
// 通用沒有具體業務意義的代碼,只是為了保證主線程不退出
synchronized (ChoiceCamel.class) {
ChoiceCamel.class.wait();
}
}
@Override
public void configure() throws Exception {
// 這是1個JsonPath表達式,用于從http攜帶的json信息中,提取orgId屬性的值
JsonPathExpression jsonPathExpression = new JsonPathExpression("$.data.orgId");
jsonPathExpression.setResultType(String.class);
// 通用使用http協議接受消息
from("jetty:http://0.0.0.0:8282/choiceCamel")
// 首先送入HttpProcessor,
// 負責將exchange in Message Body當中的stream轉成字符串
// 固然,不轉的話,下面主要的choice操作也能夠運行
// HttpProcessor中的實現和上文代碼片斷中的1致,這里就不再重復貼出
.process(new HttpProcessor())
// 將orgId屬性的值存儲 exchange in Message的header中,以便后續進行判斷
.setHeader("orgId", jsonPathExpression)
.choice()
// 當orgId == yuanbao,履行OtherProcessor
// 當orgId == yinwenjie,履行OtherProcessor2
// 其它情況履行OtherProcessor3
.when(header("orgId").isEqualTo("yuanbao"))
.process(new OtherProcessor())
.when(header("orgId").isEqualTo("yinwenjie"))
.process(new OtherProcessor2())
.otherwise()
.process(new OtherProcessor3())
// 結束
.endChoice();
}
/**
* 這個處理器用來完成輸入的json格式的轉換
* 和上1篇文章出現的HttpProcessor 內容基本1致。就不再貼出了
* @author yinwenjie
*/
public class HttpProcessor implements Processor {
......
}
/**
* 另外一個處理器OtherProcessor
* @author yinwenjie
*/
public class OtherProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
String body = message.getBody().toString();
// 存入到exchange的out區域
if(exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || 被OtherProcessor處理");
}
}
}
/**
* 很簡單的處理器OtherProcessor2
* 和OtherProcessor基本相同,就不再重復貼出
* @author yinwenjie
*/
public class OtherProcessor2 implements Processor {
......
outMessage.setBody(body + " || 被OtherProcessor2處理");
}
/**
* 很簡單的處理器OtherProcessor3
* 和OtherProcessor基本相同,就不再重復貼出
* @author yinwenjie
*/
public class OtherProcessor3 implements Processor {
......
outMessage.setBody(body + " || 被OtherProcessor3處理");
}
}
以上代碼片斷中,開發人員首先使用JsonPath表達式,從Http中攜帶的json信息中尋覓到orgId這個屬性的值,然后將這個值存儲在Exchange的header區域(這樣做只是為了后續方便判斷,您也能夠將值存儲在Exchange的properties區域,還可以直接使用JsonPath表達式進行判斷) 。接下來,通過判斷存儲在header區域的值,讓消息路由進入不同的Processor處理器。由于我們設置的from-jetty-endpoint中默許的Exchange Pattern值為InOut,所以在各個Processor處理器中完成處理后 Out Message的Body內容會以Http響應結果的情勢返回到from-jetty-endPoint中。最后我們將在測試頁面上看到Processor處理器中的消息值。
Camel中支持絕大多數被開發人員承認和使用的表達式:正則式、XPath、JsonPath等。如果各位讀者對JsonPath的語法還不熟習的話,可以參考Google提供的說明文檔(https://code.google.com/p/json-path/)。為了測試以上代碼片斷的工作效果,我們使用Postman工具向指定的地址發送1段json信息,并視察全部路由的履行效果。以下圖所示:
當orgId的值為yuanbao時的履行效果
當orgId的值為yinwenjie時的履行效果
關于路由判斷,Camel中提供了豐富的條件判斷手段。除我們在本小節中使用的isEqualTo方式還包括:isGreaterThan、isGreaterThanOrEqualTo、isLessThan、isLessThanOrEqualTo、isNotEqualTo、in(多個值)、contains、regex等等,它們的共同點是這些方法都返回某個實現了org.apache.camel.Predicate接口的類。
在本小節上部份的介紹中,我們說明了怎樣使用條件判斷向若干可能的路由路徑中的某1條路徑傳送消息。那末如何做到根據判斷條件,向若干可能的路徑中的其中多條路徑傳送同1條消息呢?又或向若干條可能的路徑全部傳輸同1條消息呢?
在Camel中可能被選擇的消息路由路徑稱為接收者,Camel提供了多種方式向路由中可能成為下1處理元素的多個接收者發送消息:靜態接收者列表(Static Recipient List)、動態接收者列表(Dynamic Recipient List)和 循環動態路由(Dynamic Router)。下面我們對這幾種接收者列表情勢進行逐1講授。
使用multicast方式時,Camel將會把上1處理元素輸出的Exchange復制多份發送給這個列表中的所有接收者,并且按順序逐1履行(可設置為并行處理)這些接收者。這些接收者多是通過Direct連接的另外一個路由,也多是Processor或某個單1的Endpoint。需要注意的是,Excahnge是在Endpoint控制端點和Processor處理器間或兩個Processor處理器間唯1能夠有效攜帶Message的元素,所以將1條消息復制多份并且讓其履行不相互遭到影響,那末必定就會對Exchange對象進行復制(是復制,是復制,雖然主要屬性內容相同,但是這些Exchange使用的內存區域都是不1樣的,ExchangeId也不1樣)。以下是multicast使用的簡單示例代碼:
package com.yinwenjie.test.cameltest.helloworld;
......
/**
* 測試組播路由
* @author yinwenjie
*/
public class MulticastCamel extends RouteBuilder {
public static void main(String[] args) throws Exception {
// 這是camel上下文對象,全部路由的驅動全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 啟動route
camelContext.start();
// 將我們編排的1個完全消息路由進程,加入到上下文中
camelContext.addRoutes(new MulticastCamel());
// 通用沒有具體業務意義的代碼,只是為了保證主線程不退出
synchronized (MulticastCamel.class) {
MulticastCamel.class.wait();
}
}
@Override
public void configure() throws Exception {
// 這個線程池用來進行multicast中各個路由線路的并發履行
ExecutorService executorService = Executors.newFixedThreadPool(10);
MulticastDefinition multicastDefinition = from("jetty:http://0.0.0.0:8282/multicastCamel").multicast();
// multicast 中的消息路由可以順序履行也能夠并發履行
// 這里我們演示并發履行
multicastDefinition.setParallelProcessing(true);
// 為并發履行設置1個獨立的線程池
multicastDefinition.setExecutorService(executorService);
// 注意,multicast中各路由路徑的Excahnge都是基于上1路由元素的excahnge復制而來
// 不管前者Excahnge中的Pattern如何設置,其處理結果都不會反應在最初的Excahnge對象中
multicastDefinition.to(
"log:helloworld1?showExchangeId=true"
,"log:helloworld2?showExchangeId=true")
// 1定要使用end,否則OtherProcessor會被做為multicast中的1個分支路由
.end()
// 所以您在OtherProcessor中看到的Excahnge中的Body、Header等屬性內容
// 不會有“復制的Exchange”設置的任何值的痕跡
.process(new OtherProcessor());
}
/**
* 另外一個處理器
* @author yinwenjie
*/
public class OtherProcessor implements Processor {
/* (non-Javadoc)
* @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
*/
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
LOGGER.info("OtherProcessor中的exchange" + exchange);
String body = message.getBody().toString();
// 存入到exchange的out區域
if(exchange.getPattern() == ExchangePattern.InOut) {
Message outMessage = exchange.getOut();
outMessage.setBody(body + " || 被OtherProcessor處理");
}
}
}
}
以上代碼片斷中,我們使用multicast將原始的Exchange復制了多份,分外傳送給multicast中的兩個接收者,并且為了保證兩個接收者的處理進程是并行的,我們還專門為multicast設置了1個線程池(不設置的話Camel將自行設置)。在以上的代碼片斷中,在multicast路由定義以后我們還設置了1個OtherProcessor處理器,它主要作用就是查看OtherProcessor中的Exchange對象的狀態。下面的截圖展現了以上代碼片斷的履行效果:
[2016-06-24 16:07:43] INFO pool⑴-thread⑺ Exchange[Id: ID-yinwenjie-240-54110-1466755310568-0-20, ExchangePattern: InOut, BodyType: String, Body: {"data":{"orgId":"yinwenjie"},"token":"d9c33c8f-ae59⑷edf-b37f⑵90ff208de2e","desc":""}] (MarkerIgnoringBase.java:96)
[2016-06-24 16:07:43] INFO pool⑴-thread⑻ Exchange[Id: ID-yinwenjie-240-54110-1466755310568-0-19, ExchangePattern: InOut, BodyType: String, Body: {"data":{"orgId":"yinwenjie"},"token":"d9c33c8f-ae59⑷edf-b37f⑵90ff208de2e","desc":""}] (MarkerIgnoringBase.java:96)
[2016-06-24 16:07:43] INFO qtp1060925979⑴8 OtherProcessor中的exchange [id:ID-yinwenjie-240-54110-1466755310568-0-16]Exchange[Message: {"data":{"orgId":"yinwenjie"},"token":"d9c33c8f-ae59⑷edf-b37f⑵90ff208de2e","desc":""}] (MulticastCamel.java:74)
通過履行結果可以看到,在multicast中的兩個接收者(兩個路由分支的設定)分別在我們設置的線程池中運行,線程ID分別是【pool⑴-thread⑺】和【pool⑴-thread⑻】。在multicast中的所有路由分支都運行完成后,OtherProcessor處理器的實例在【qtp1060925979⑴8】線程中繼續運行(jetty:http-endpint對本次要求的處理本來就在這個線程上運行)。
請各位讀者特別注意以上3句日志所輸出的ExchangeId,它們是完全不同的3個Exchange實例!其中在multicast的兩個路由分支中承載Message的Excahnge對象,它們的Exchange-ID號分別為【ID-yinwenjie⑵40⑸4110⑴466755310568-0⑵0】和【ID-yinwenjie⑵40⑸4110⑴466755310568-0⑴9】,來源則是multicast對原始Exchange對象的復制,原始Exchagne對象的Exchange-ID為【ID-yinwenjie⑵40⑸4110⑴466755310568-0⑴6】。
在編排路由,很多情況下開發人員不能肯定有哪些接收者會成為下1個處理元素:由于它們需要由Exchange中所攜帶的消息內容來動態決定下1個處理元素。這類情況下,開發人員就需要用到recipient方法對下1路由目標進行動態判斷。以下代碼示例中,我們將3個已編排好的路由注冊到Camel服務中,并通過打印在控制臺上的結果視察其履行:
public class DirectRouteA extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
from("jetty:http://0.0.0.0:8282/dynamicCamel")
.setExchangePattern(ExchangePattern.InOnly)
.recipientList().jsonpath("$.data.routeName").delimiter(",")
.end()
.process(new OtherProcessor());
}
}
/**
* @author yinwenjie
*/
public class DirectRouteB extends RouteBuilder {
/* (non-Javadoc)
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
// 第2個路由和第3個路由的代碼都類似
// 唯1不同的是類型
from("direct:directRouteB")
.to("log:DirectRouteB?showExchangeId=true");
}
}
......
public static void main(String[] args) throws Exception {
// 這是camel上下文對象,全部路由的驅動全靠它了。
ModelCamelContext camelContext = new DefaultCamelContext();
// 啟動route
camelContext.start();
// 將我們編排的1個完全消息路由進程,加入到上下文中
camelContext.addRoutes((new DynamicCamel()).new DirectRouteA());
camelContext.addRoutes((new DynamicCamel()).new DirectRouteB());
camelContext.addRoutes((new DynamicCamel()).new DirectRouteC());
// 通用沒有具體業務意義的代碼,只是為了保證主線程不退出
synchronized (DynamicCamel.class) {
DynamicCamel.class.wait();
}
}
......
DirectRouteB路由和DirectRouteC路由中的代碼非常簡單,就是從通過direct連接到本路由的上1個路由實例中獲得并打印Exchange對象的信息。所以各位讀者可以看到以上代碼片斷只羅列了DirectRouteB的代碼信息。DirectRouteA路由中“ExchangePattern.InOnly”的作用在上文中已講過,這里就不再進行贅述了。需要重點說明的是recipientList方法,這個方法可以像multicast方法那樣進行并發履行或運行線程池的設置,但是在DirectRouteA的代碼中我們并沒有那樣做,這是為了讓讀者看清除recipientList或multicast方法的順序履行履行效果。以下是我們啟動Camel服務后,從Postman(或其他測試工具)傳入的JSON格式的信息:
{"data":{"routeName":"direct:directRouteB,direct:directRouteC"},"token":"d9c33c8f-ae59⑷edf-b37f⑵90ff208de2e","desc":""}
recipientList方法將以 .data.routeName 中指定的路由信息動態決定1下個或多個消息接收者,以上JSON片斷中我們指定了兩個“direct:directRouteB,direct:directRouteC”。那末recipientList會使用delimiter方法中設置的“,”作為分隔符來分別肯定這兩個接收者。
[2016-06-26 10:31:53] INFO qtp1896561093-16 Exchange[Id: ID-yinwenjie-240-55214-1466908306101-0-3, ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache, Body: [Body is instance of org.apache.camel.StreamCache]] (MarkerIgnoringBase.java:96)
[2016-06-26 10:31:53] INFO qtp1896561093-16 Exchange[Id: ID-yinwenjie-240-55214-1466908306101-0-4, ExchangePattern: InOnly, BodyType: org.apache.camel.converter.stream.InputStreamCache, Body: [Body is instance of org.apache.camel.StreamCache]] (MarkerIgnoringBase.java:96)
[2016-06-26 10:31:53] INFO qtp1896561093-16 OtherProcessor中的exchange [id:ID-yinwenjie-240-55214-1466908306101-0-1]Exchange[Message: [Body is instance of org.apache.camel.StreamCache]] (DynamicCamel.java:100)
靜態路由和動態路由在履行效果上有很多類似的地方。例如在兩種路徑選擇方式中,路由分支上的接收者中使用的Exchange對象的來源都是對上1履行元素所輸出的Exchange對象的復制,這些Exchange對象除其中攜帶的業務內容相同外,ExchangeID是不1樣,也就是說每一個路由分支的Exchange對象都不相同。所以各路由分支的消息都不受彼此影響。另外動態路由和靜態路由都支持對路由分支的順序履行和并發履行,都可以為并發履行設置獨立的線程池。
從以上履行效果中我們可以看到,由于我們沒有設置動態路由是并發履行,所以各個需要履行的路由分支都是由名為【qtp1896561093⑴6】的Camel服務線程順次履行,并且每一個路由分支的Exchange對象都不受彼此影響。另外,請注意以上履行結果的最后1條日志信息,它是在路由分支之外對OtherProcessor處理器的履行。因而可知不管路由分支如何履行,都不會影響路由分支之外的元素履行時所使用的Exchange對象。
===================================
(接下文)