多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內(nèi)最全IT社區(qū)平臺 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > 互聯(lián)網(wǎng) > 【Samza系列】實時計算Samza中文教程(四)―API概述

【Samza系列】實時計算Samza中文教程(四)―API概述

來源:程序員人生   發(fā)布時間:2014-11-20 09:03:22 閱讀次數(shù):3658次
    上1篇和大家1起宏觀上學(xué)習(xí)了Samza平臺的架構(gòu),重點講了1下數(shù)據(jù)緩沖層和資源管理層,剩下的1塊很重要的SamzaAPI層本節(jié)作為重點為大家展開介紹。
    當(dāng)你使用Samza來實現(xiàn)1個數(shù)據(jù)流處理邏輯時,你必須實現(xiàn)1個叫StreamTask的接口,以下所示:
public class MyTaskClass implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { // process message } }
    當(dāng)你運行你的job時,Samza將為你的class創(chuàng)建1些實例(可能在多臺機器上)。這些任務(wù)實例會處理輸入流里的消息。

    在你的job的配置中你能告知Samza你想消費哪條數(shù)據(jù)流。舉1個較為完全的例子(大家也能夠參看http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/configuration.html
):
# This is the class above, which Samza will instantiate when the job is run task.class=com.example.samza.MyTaskClass # Define a system called "kafka" (you can give it any name, and you can define # multiple systems if you want to process messages from different sources) systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory # The job consumes a topic called "PageViewEvent" from the "kafka" system task.inputs=kafka.PageViewEvent # Define a serializer/deserializer called "json" which parses JSON messages serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory # Use the "json" serializer for messages in the "PageViewEvent" topic systems.kafka.streams.PageViewEvent.samza.msg.serde=json
    對Samza從任務(wù)的輸入流利接收的每條消息,處理邏輯都會被調(diào)用。它主要包括3個重要的信息:消息、關(guān)鍵詞key和消息來自的數(shù)據(jù)流:
/** Every message that is delivered to a StreamTask is wrapped * in an IncomingMessageEnvelope, which contains metadata about * the origin of the message. */ public class IncomingMessageEnvelope { /** A deserialized message. */ Object getMessage() { ... } /** A deserialized key. */ Object getKey() { ... } /** The stream and partition that this message came from. */ SystemStreamPartition getSystemStreamPartition() { ... } }
    注意鍵和值都要被聲明為對象,并且需要轉(zhuǎn)化為正確的類型。如果你不配置1個serializer/deserializer,它們就會成為典型的java字節(jié)數(shù)組。1個deserializer能夠轉(zhuǎn)化這些字節(jié)到其他任意類型,舉個例子來講j1個son deserializer能夠?qū)⒆止?jié)數(shù)組轉(zhuǎn)化為Map、List和字符串對象。
    SystemStreamPartition()這個方法會返回1個SystemStreamPartition對象,它會告知你消息是從哪里來的。它由以下3部份組成:
    1. The system:系統(tǒng)的名字來源于消息,就在你job的配置里定義。你可以有多個用于輸入和輸出的不同名字的系統(tǒng);
    2. The stream name: 在原系統(tǒng)里數(shù)據(jù)流(話題、隊列)的名字。一樣也是在job的配置里定義;
    3. The partition: 1條數(shù)據(jù)流通常會被劃分到多個分區(qū),并且每個分區(qū)會被Samza安排1個StreamTask實例;
    API看起來像是這樣的:
/** A triple of system name, stream name and partition. */ public class SystemStreamPartition extends SystemStream { /** The name of the system which provides this stream. It is defined in the Samza job's configuration. */ public String getSystem() { ... } /** The name of the stream/topic/queue within the system. */ public String getStream() { ... } /** The partition within the stream. */ public Partition getPartition() { ... } }
    在上面這個job的配置例子里可以看到,這個系統(tǒng)名字叫“Kafka”,數(shù)據(jù)流的名字叫“PageViewEvent”。(kafka這個名字不是特定的――你能給你的系統(tǒng)取任何你想要的名字)。如果你有1些輸入流向?qū)肽愕腟treamTask,你能夠使用SystemStreamPartition去決定你接遭到哪1類消息。

    如何發(fā)送消息呢?如果你看1下StreamTask里的process()方法,你將看到你有1個MessageCollector接口。
/** When a task wishes to send a message, it uses this interface. */ public interface MessageCollector { void send(OutgoingMessageEnvelope envelope); }
    為了發(fā)送1個消息, 你會創(chuàng)建1個OutgoingMessageEnvelop對象并且把它傳遞給消息搜集器。它最少會肯定你想要發(fā)送的消息、系統(tǒng)和數(shù)據(jù)流名字再發(fā)送出去。你也能夠肯定分區(qū)的key和另外一些參數(shù)。具體可以參考javadoc(http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)。

    注意事項:
    請只在process()方法里使用MessageCollector對象。如果你保持住1個MessageCollector實例并且以后再次使用它,你的消息可能會毛病地發(fā)送出去。舉1個例子,這兒有1個簡單的任務(wù),它把每一個輸入的消息拆成單詞,并且發(fā)送每個單詞作為1個消息:
public class SplitStringIntoWords implements StreamTask { // Send outgoing messages to a stream called "words" // in the "kafka" system. private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "words"); public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { String message = (String) envelope.getMessage(); for (String word : message.split(" ")) { // Use the word as the key, and 1 as the value. // A second task can add the 1's to get the word count. collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1)); } } }
    Samza的API的概要介紹就到這里吧,很多細(xì)節(jié)的API可以參看javadoc文檔,這也是官網(wǎng)下1節(jié)的內(nèi)容,由于篇幅有限,大家可以自己針對性的去深入了解了解就能夠了。下1篇會講1下之前在架構(gòu)篇里屢次提到的SamzaContainer。


    

生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 亚洲小说另类 | 俺去啦最新地址 | 欧美bbwxxxx| 婷婷激情五月 | 欧美夜色| 免费毛片网 | 亚洲大成色www永久网 | 欧美一级h | 在线不欧美 | 国产一区二区视频在线播放 | 国产福利一区二区三区视频在线 | 偷自视频区视频真实在线 | 精品国产精品久久一区免费式 | 一级毛片免费一级直接观看 | 亚洲最新在线 | 一级毛片在线观看视频 | 性做久久久久久网站 | 亚洲天堂.com | 亚洲精品色综合区 | 欧美日韩午夜 | 欧美日韩国产综合在线小说 | 91久久国产精品 | 欧美日韩另类国产 | 免费澳门一级毛片 | japan高清日本乱xxxx | 国产成人久久精品激情91 | 国产成人免费a在线视频色戒 | 久草在线免费色站 | 免费一级特黄欧美大片久久网 | 国产成人在线视频观看 | 在线欧美三级 | 国产精品2 | 日韩精品亚洲人成在线播放 | 免费视频网站一级人爱视频 | 97麻豆精品国产自产在线观看 | 午夜在线a亚洲v天堂网2019 | 图片区小说校园综合 | 亚洲看看| 亚洲精品 国产 日韩 | 琪琪午夜伦埋大全影院 | 日本xxxxxx |