【Samza系列】實時計算Samza中文教程(四)―API概述
來源:程序員人生 發(fā)布時間:2014-11-20 09:03:22 閱讀次數(shù):3658次
上1篇和大家1起宏觀上學(xué)習(xí)了Samza平臺的架構(gòu),重點講了1下數(shù)據(jù)緩沖層和資源管理層,剩下的1塊很重要的SamzaAPI層本節(jié)作為重點為大家展開介紹。
當(dāng)你使用Samza來實現(xiàn)1個數(shù)據(jù)流處理邏輯時,你必須實現(xiàn)1個叫StreamTask的接口,以下所示:
public class MyTaskClass implements StreamTask {
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
// process message
}
}
當(dāng)你運行你的job時,Samza將為你的class創(chuàng)建1些實例(可能在多臺機器上)。這些任務(wù)實例會處理輸入流里的消息。
在你的job的配置中你能告知Samza你想消費哪條數(shù)據(jù)流。舉1個較為完全的例子(大家也能夠參看http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/configuration.html
):
# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass
# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent
# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json
對Samza從任務(wù)的輸入流利接收的每條消息,處理邏輯都會被調(diào)用。它主要包括3個重要的信息:消息、關(guān)鍵詞key和消息來自的數(shù)據(jù)流:
/** Every message that is delivered to a StreamTask is wrapped
* in an IncomingMessageEnvelope, which contains metadata about
* the origin of the message. */
public class IncomingMessageEnvelope {
/** A deserialized message. */
Object getMessage() { ... }
/** A deserialized key. */
Object getKey() { ... }
/** The stream and partition that this message came from. */
SystemStreamPartition getSystemStreamPartition() { ... }
}
注意鍵和值都要被聲明為對象,并且需要轉(zhuǎn)化為正確的類型。如果你不配置1個serializer/deserializer,它們就會成為典型的java字節(jié)數(shù)組。1個deserializer能夠轉(zhuǎn)化這些字節(jié)到其他任意類型,舉個例子來講j1個son deserializer能夠?qū)⒆止?jié)數(shù)組轉(zhuǎn)化為Map、List和字符串對象。
SystemStreamPartition()這個方法會返回1個SystemStreamPartition對象,它會告知你消息是從哪里來的。它由以下3部份組成:
1. The system:系統(tǒng)的名字來源于消息,就在你job的配置里定義。你可以有多個用于輸入和輸出的不同名字的系統(tǒng);
2. The stream name: 在原系統(tǒng)里數(shù)據(jù)流(話題、隊列)的名字。一樣也是在job的配置里定義;
3. The partition: 1條數(shù)據(jù)流通常會被劃分到多個分區(qū),并且每個分區(qū)會被Samza安排1個StreamTask實例;
API看起來像是這樣的:
/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {
/** The name of the system which provides this stream. It is
defined in the Samza job's configuration. */
public String getSystem() { ... }
/** The name of the stream/topic/queue within the system. */
public String getStream() { ... }
/** The partition within the stream. */
public Partition getPartition() { ... }
}
在上面這個job的配置例子里可以看到,這個系統(tǒng)名字叫“Kafka”,數(shù)據(jù)流的名字叫“PageViewEvent”。(kafka這個名字不是特定的――你能給你的系統(tǒng)取任何你想要的名字)。如果你有1些輸入流向?qū)肽愕腟treamTask,你能夠使用SystemStreamPartition去決定你接遭到哪1類消息。
如何發(fā)送消息呢?如果你看1下StreamTask里的process()方法,你將看到你有1個MessageCollector接口。
/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
void send(OutgoingMessageEnvelope envelope);
}
為了發(fā)送1個消息, 你會創(chuàng)建1個OutgoingMessageEnvelop對象并且把它傳遞給消息搜集器。它最少會肯定你想要發(fā)送的消息、系統(tǒng)和數(shù)據(jù)流名字再發(fā)送出去。你也能夠肯定分區(qū)的key和另外一些參數(shù)。具體可以參考javadoc(http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)。
注意事項:
請只在process()方法里使用MessageCollector對象。如果你保持住1個MessageCollector實例并且以后再次使用它,你的消息可能會毛病地發(fā)送出去。舉1個例子,這兒有1個簡單的任務(wù),它把每一個輸入的消息拆成單詞,并且發(fā)送每個單詞作為1個消息:
public class SplitStringIntoWords implements StreamTask {
// Send outgoing messages to a stream called "words"
// in the "kafka" system.
private final SystemStream OUTPUT_STREAM =
new SystemStream("kafka", "words");
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator) {
String message = (String) envelope.getMessage();
for (String word : message.split(" ")) {
// Use the word as the key, and 1 as the value.
// A second task can add the 1's to get the word count.
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
}
}
}
Samza的API的概要介紹就到這里吧,很多細(xì)節(jié)的API可以參看javadoc文檔,這也是官網(wǎng)下1節(jié)的內(nèi)容,由于篇幅有限,大家可以自己針對性的去深入了解了解就能夠了。下1篇會講1下之前在架構(gòu)篇里屢次提到的SamzaContainer。
生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機掃描二維碼進行捐贈