多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > 服務器 > storm 學習教程

storm 學習教程

來源:程序員人生   發布時間:2016-06-14 09:26:37 閱讀次數:3437次

翻譯太累了,不再想去翻譯了,真的太累了:

在這個教程中, 你將學到如何創建1個Storm topologies和怎樣把它部署到storm集群上。本教程中,Java將作為主要使用的語言,但在1小部份示例中將會使用Python來論述storm處理多語言的能力。

豫備工作

本教程使用的例子來自于 storm-starter 項目. 我們建議你拷貝該項目并跟隨這個例子來進行學習。 請瀏覽 Setting up a development environment 和 Creating a new Storm project 創建好相應的基礎環境。

Storm集群的組件

Storm集群在表面上與Hadoop集群相似。在Hadoop上運行"MapReduce jobs",而在Storm上運行的是"topologies"。 "Jobs" and "topologies" 它們本身非常的不同 -- 1個關鍵的不同的是MapReduce job終究會完成并結束,而topology的消息處理將無窮期進行下去(除非你kill它)。

在storm集群中,有兩類節點。Master節點運行守護進程稱為"Nimbus",它有點像 Hadoop 的 "JobTracker"。Nimbus負責集群的代碼分發,任務分配,故障監控。

每一個工作節點運行的守護進程稱為"Supervisor"。Supervisor 負責監聽分配到它自己機器的作業,根據需要啟動和停止相應的工作進程,固然這些工作進程也是Nimbus分派給它的。每一個工作進程履行topology的1個子集;1個運行的topology是由散布在多個機器的多個工作進程組成的。


Storm cluster

Nimbus 與 Supervisors 所有的調和工作是由 Zookeeper 集群完成的. 另外,Nimbus 守護進程 和 Supervisor 守護進程 是無狀態的,快速失敗的機制。 所有的狀態保存在Zookeeper上或本地磁盤中。這就是說,你用kill ⑼殺掉Nimbus 或Supervisors,它們重新啟動后就像甚么都沒有產生1樣,這樣的設計讓storm集群具有使人難以置信的穩定性。

Topologies

在Storm上進行實時計算,你需要創建名為 "topologies" 的這么個東西。1個topology是1個計算的圖,每一個在topology中的節點(以下部份也稱作“組件”)包括了處理邏輯,和多個節點間數據傳送的聯系和方式。運行1個topology很直接簡單的。第1,你把你的java code和它所有的依賴打成1個單獨的jar包。然后,你用以下的命令去運行就能夠了。

storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2

這個例子中運行的類是 org.apache.storm.MyTopology 且帶著兩個參數 arg1 和 arg2. 這個類的主要功能是定義topology,并被提交到Nimbus中。命令 storm jar 就是用來加載這個topology jar的。

由于topology的定義方式就是Thrift的結構,Nimbus也是1個Thrift服務,所以你可以用任何語言去創建topologies并提交。以上的例子是最簡單的方式去使用基于JVM的語言(比如java)創建的topology。請瀏覽 Running topologies on a production cluster 來取得更多的信息關于topology的啟動和停止。

Storm里的核心抽象就是 "流"。流 是1個無界的 元組序列。Storm提供原始地、散布式地、可靠地方式 把1個流轉變成1個新的流。舉例來講,

你可以把1個 tweet 流 轉換成1個 趨勢主題 的流。 

Storm中提供 流轉換 的最主要的原生方式是 "spouts" and "bolts"。Spouts 和 bolts 有相應的接口,你需要用你的利用的特定邏輯實現接口便可。

Spout是 流 的源頭。舉例來講,1個spout或許會從 Kestrel 隊列中讀取數據并以流的方式發射出來,亦或1個spout或許會連接Twitter的API,

并發出1個關于tweets的流。

1個bolt可以消費任意數量的輸入流,并做1些處理,也能夠由此發出1個新的流。復雜流的轉換,像從1個tweet流中計算出1個關于趨勢話題的流,它要求多個步驟,因此也需要多個bolt。Bolts 能做任何事情,運行方法,過濾元組,做流的聚合,流的連接,寫入數據庫等。 

Spouts 和 bolts 組成的網絡 打包到 "topology" 中,它是頂級的抽象,是你需要提交到storm集群履行的東西。1個topology是1個由spout和bolt組成的 做流轉換 的圖,其中圖中的每一個節點都可以是1個spout或1個bolt。圖中的邊表明了bolt定閱了哪些流,亦或是當1個spout或bolt發射元組到流中,它發出的元組數據到定閱這個流的所有bolt中去。

A Storm topology

topology中節點之間的聯系表明了元組數據是怎樣去傳送的。舉例來講,Spout A 和 Bolt B 相連(A 到 B),Spout A 和 Bolt C 相連(A 到 C),Bolt B 和 Bolt C相連(B 到 C)。每當Spout A發出元組數據時,它會同時發給Bolt B 和 Bolt C。再者,所有Bolt B的輸出元組,也會發給Bolt C。

在topology中的每一個節點都是并行運行的。因此在你的topology中,你可以為每一個節點指定并行運行的數量,然后storm集群將會產生相應數量的線程來履行。

1個topology無窮運行,直到你殺掉它才會停止。Storm將自動地重新分配失敗過的任務。另外,Storm保證不會有數據丟失,即使是機器掛掉,消息被拋棄。

數據模型

Storm用元組作為它的數據模型。1個元組是1個命名的,有值的,1般由過個字段組成的序列,序列中的每一個字段可以是任何類型的對象。在沙箱以外,Storm提供所有的原始類型,字符串,byte數組作為元組的字段值。如果想用1個其他類型的對象,你需要實現a serializer 接口。

每一個topology節點必須聲明輸出元組的字段。舉例來講,這個bolt聲明它將輸出帶有"double" and "triple"兩個字段的元組:

public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } }

 declareOutputFields 方法聲明了該bolt的輸出的字段 ["double", "triple"] .這個bolt的剩余部份將在接下來進行說明。

1個簡單topology

讓我們去看1個簡單的 topology,去探索更多的概念,去看1下它的代碼到底長甚么樣。讓我們看1下 ExclamationTopology 的定義,來自storm-starter的項目:

TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1");

這個topology包括了1個spout和兩個bolt,這個spout發出words,然后每一個bolt都在自己的讀入字符串上加上"!!!"。

這些節點被安排成1條線:spout發給第1個bolt,第1個bolt發給第2個bolt。如果spout發出的元組數據是["bob"] 和 ["john"],通過第2個

bolt后,將發出["bob!!!!!!"] 和 ["john!!!!!!"]。

代碼中用 setSpout 和 setBolt 方法定義了節點。這些方法的輸入是 1個用戶指定的id,包括處理邏輯的對象,你希望該節點并行計算的數量。在這個例子中,這個 spout 的id是 "words" ,兩個bolt的id分別為 "exclaim1" 和 "exclaim2"。

包括處理邏輯的spout對象實現了 IRichSpout 接口,bolt對象實現了 IRichBolt 接口。

最后1個參數,是你希望該節點并行計算的數量是多少,這是可選的。它表明了會有多少線程會通過storm集群來履行這個組件(spout或bolt)。

如果你疏忽它,Storm集群會分配單線程給該節點。

setBolt 返回1個 InputDeclarer 對象,它用來定義bolt的輸入。在這里,bolt "exclaim1" 聲明了它希望通過shuffle分組的方式讀取 spout "words"中的所有元組。同理,Bolt "exclaim2" 聲明了它希望通過shuffle分組的方式讀取 bolt "exclaim1" 所發出的元組數據。

"shuffle 分組" 指元組數據 將會 隨機散布地 從輸入任務 到bolt任務中。在多個組件(spout或bolt)之間,這里有很多數據分組的方式。

這在接下來的章節中會說明。

如果你希望bolt "exclaim2" 從 spout "words" 和 bolt "exclaim1" 讀取所有的元組數據,你需要像下面這樣定義:

builder.setBolt("exclaim2", new ExclamationBolt(), 5) .shuffleGrouping("words") .shuffleGrouping("exclaim1");

如你所見,輸入的定義可以是鏈式的,bolt可以指定多個源。

讓我們深入了解1下spout和bolt在topology中的實現。Spout負責發送新的數據到topology中。 TestWordSpout 在topology中每隔 100ms 發送了1個隨機的單詞,單詞來自列表["nathan", "mike", "jackson", "golda", "bertels"]。在 TestWordSpout 中的 nextTuple() 的實現細節以下:

public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); }

如你所見,這個實現非常簡單明了。

ExclamationBolt 給輸入的字符串追加上 "!!!" 。 讓我們看1下 ExclamationBolt 的完全實現吧:

public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }

prepare 方法提供了1個 OutputCollector 對象,它用來發出元組數據給下游節點。元組數據可以在任意時間從bolt發出 -- 可以在 prepare,execute, 或 cleanup 方法,或 在另外一個線程,異步地發送。 這里的 prepare 實現很簡單,初始化并保存了 OutputCollector 的援用,該援用將在接下來的execute 方法中使用。

execute 方法從該bolt的輸入中接收1個元組數據, ExclamationBolt 對象提取元組中的第1個字段,并追加字符串 "!!!" 。如果你實現的bolt定閱了多個輸入源,你可以用 Tuple 中的 Tuple#getSourceComponent 方法來獲得你當前讀取的這個元組數據來自于哪一個源。

在 execute 方法中,還有1點東西需要說明。 即輸入的元組作為第1個參數 發出 ,然后在最后1行中發出確認消息。這是Storm保證可靠性的API的1部份,它保證數據不會丟失,這在以后的教程會說明。

當1個Bolt將要停止、關閉時,它需要關閉當前打開的資源,此時 cleanup 方法可以被調用。 需要注意的是,這其實不保證這個方法在storm集群中1定會被調用:舉例來講,如果機器上的任務爆發,這就不會調用這個方法。 cleanup 方法打算用于,當你在 local mode 上運行你的topology(摹擬storm集群的仿真模式), 你能夠啟動和停止很多topology且不會遭受任何資源泄漏的問題。

declareOutputFields 方法聲明 ExclamationBolt 發出1個名稱為 "word" 的帶1個字段的元組。

getComponentConfiguration 方法允許你從很多方面配置這個組件怎樣去運行。更多高級的話題,深入的解釋,請參見 Configuration.

通常像 cleanup 和 getComponentConfiguration 方法,在bolt中其實不是必須去實現的。你可以用1個更加簡潔的方式,通過使用1個提供默許實現的基本類去定義bolt,這或許更加適合1些。 ExclamationBolt 可以通過繼承 BaseRichBolt,這會更簡單1點,就像這樣:

public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }

local mode上運行ExclamationTopology

讓我們來看1下如何在本地模式上運行 ExclamationTopology ,看到它工作起來。

Storm有兩種操作模式:本地模式和散布式模式。在本地模式中,Storm通過線程摹擬工作節點并在1個進程中完成履行。本地模式在用于開發和測試topology時是很有用途的。當你在本地模式中運行 storm-starter 項目中的 topology 時,你就可以看到每一個組件發送了甚么信息。你可以獲得更多關于在本地模式上運行topology的信息,請參見 Local mode。

在散布式模式中,Storm操作的是機器集群。當你提交1個topology給master,你也需要提交運行這個topology所必須的代碼。Master將會關注于分發你的代碼并分配worker去運行你的topology。如果worker掛掉,master將會重新分配這些代碼、topology到其他地方。你可以獲得更多關于在散布式模式上運行topology的信息,請參見 Running topologies on a production cluster。

這是在本地模式上運行 ExclamationTopology 的代碼:

Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();

首先,代碼中定義了單進程的偽集群,通過創建 LocalCluster 對象實現。提交topology到虛擬集群,和提交topology到真實的散布式集群是相同的。提交topology使用LocalCluster 的 submitTopology 方法。它需要的參數為 topology的名字,topology的配置,topology本身。

topology的名稱是為了標識topology,以便你以后可以停掉它。1個topology將無窮期運行,直到你停掉它。

topology的配置可以從多個方面調劑topology運行時的形態。這里給出了兩個最為常見的配置:

  1. TOPOLOGY_WORKERS (用 setNumWorkers方法來設置) 表明你希望在storm集群中分配多少進程來履行你的topology。每一個在topology中的組件(spout 或 bolt)將會被分配多個線程去履行。線程數的設置是通過組件的 setBolt 和 setSpout 方法。這些線程存在于worker進程中。 每一個worker進程包括了處理1些組件的1些線程,例如,你橫跨集群指定了300個線程處理所有的組件,且指定了50個worker進程。也就是說,每一個工作進程將履行6個線程, 其中的每個可能又屬于不同的組件。調劑topology的性能需要通過調劑每一個組件的并行線程數 和 工作進程中運行的線程數量。
  2. TOPOLOGY_DEBUG (用 setDebug方法來設置), 當設為true的時候,它將告知Storm打印組件發出的每條信息。這在本地模式測試topology的時候很有用途。但是當你的topology在集群中運行的時候,也許你應當關掉它。

這里有很多其他的topology的配置,更多細節請參見 the Javadoc for Config.

學習如何建立自己的開發環境,以便你能用本地模式運行你的topology(比如在eclipse里),請參見 Creating a new Storm project.

流的分組方式

流的分組方式告知1個topology,兩個組件是通過怎樣的方

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 欧美一区二区三区四区在线观看 | 91成人午夜精品福利院在线观看 | 欧美日韩亚洲第一页 | 国产亚洲一欧美一区二区三区 | 一级做a爱 一区 | 九九精品视频在线播放8 | 日本一区二区视频免费播放 | xxx性欧美在线观看 xxx性日本 | 欧美性视频在线播放 | 免费亚洲视频在线观看 | 爱爱综合网 | 欧美性区 | 色一情一伦一区二区三 | 一区二区三区鲁丝不卡麻豆 | 福利国产| 亚洲免费成人 | 欧美一级爱爱 | 免费观看欧美成人1314色 | 国产精品第一页在线 | 天天涩综合 | 久久精品网址 | 国产成人乱码一区二区三区在线 | 久久99精品一级毛片 | japαnese日本丰满护士 | 他添的我好湿好爽视频 | 丁香激情五月 | 亚洲一区影院 | 高清一级片 | 最近最新中文字幕大全手机在线 | 久久大伊人| 最近免费中文字幕完整4 | 欧美一区二区丝袜高跟鞋 | 日本噜噜影院 | www.国产精品视频 | 一区二区三区国模大胆 | 亚洲第一页在线 | 欧美一级淫片漂亮的老师 | 精品一区二区影院在线 | 一区二区不卡不卡一卡 | 欧美日本一道免费一区三区 | 九九精品视频一区二区三区 |