翻譯太累了,不再想去翻譯了,真的太累了:
在這個教程中, 你將學到如何創建1個Storm topologies和怎樣把它部署到storm集群上。本教程中,Java將作為主要使用的語言,但在1小部份示例中將會使用Python來論述storm處理多語言的能力。
本教程使用的例子來自于 storm-starter 項目. 我們建議你拷貝該項目并跟隨這個例子來進行學習。 請瀏覽 Setting up a development environment 和 Creating a new Storm project 創建好相應的基礎環境。
Storm集群在表面上與Hadoop集群相似。在Hadoop上運行"MapReduce jobs",而在Storm上運行的是"topologies"。 "Jobs" and "topologies" 它們本身非常的不同 -- 1個關鍵的不同的是MapReduce job終究會完成并結束,而topology的消息處理將無窮期進行下去(除非你kill它)。
在storm集群中,有兩類節點。Master節點運行守護進程稱為"Nimbus",它有點像 Hadoop 的 "JobTracker"。Nimbus負責集群的代碼分發,任務分配,故障監控。
每一個工作節點運行的守護進程稱為"Supervisor"。Supervisor 負責監聽分配到它自己機器的作業,根據需要啟動和停止相應的工作進程,固然這些工作進程也是Nimbus分派給它的。每一個工作進程履行topology的1個子集;1個運行的topology是由散布在多個機器的多個工作進程組成的。
Nimbus 與 Supervisors 所有的調和工作是由 Zookeeper 集群完成的. 另外,Nimbus 守護進程 和 Supervisor 守護進程 是無狀態的,快速失敗的機制。 所有的狀態保存在Zookeeper上或本地磁盤中。這就是說,你用kill ⑼殺掉Nimbus 或Supervisors,它們重新啟動后就像甚么都沒有產生1樣,這樣的設計讓storm集群具有使人難以置信的穩定性。
在Storm上進行實時計算,你需要創建名為 "topologies" 的這么個東西。1個topology是1個計算的圖,每一個在topology中的節點(以下部份也稱作“組件”)包括了處理邏輯,和多個節點間數據傳送的聯系和方式。運行1個topology很直接簡單的。第1,你把你的java code和它所有的依賴打成1個單獨的jar包。然后,你用以下的命令去運行就能夠了。
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
這個例子中運行的類是 org.apache.storm.MyTopology
且帶著兩個參數 arg1
和 arg2
.
這個類的主要功能是定義topology,并被提交到Nimbus中。命令 storm jar
就是用來加載這個topology jar的。
由于topology的定義方式就是Thrift的結構,Nimbus也是1個Thrift服務,所以你可以用任何語言去創建topologies并提交。以上的例子是最簡單的方式去使用基于JVM的語言(比如java)創建的topology。請瀏覽 Running topologies on a production cluster 來取得更多的信息關于topology的啟動和停止。
Storm里的核心抽象就是 "流"。流 是1個無界的 元組序列。Storm提供原始地、散布式地、可靠地方式 把1個流轉變成1個新的流。舉例來講,
你可以把1個 tweet 流 轉換成1個 趨勢主題 的流。
Storm中提供 流轉換 的最主要的原生方式是 "spouts" and "bolts"。Spouts 和 bolts 有相應的接口,你需要用你的利用的特定邏輯實現接口便可。
Spout是 流 的源頭。舉例來講,1個spout或許會從 Kestrel 隊列中讀取數據并以流的方式發射出來,亦或1個spout或許會連接Twitter的API,
并發出1個關于tweets的流。
1個bolt可以消費任意數量的輸入流,并做1些處理,也能夠由此發出1個新的流。復雜流的轉換,像從1個tweet流中計算出1個關于趨勢話題的流,它要求多個步驟,因此也需要多個bolt。Bolts 能做任何事情,運行方法,過濾元組,做流的聚合,流的連接,寫入數據庫等。
Spouts 和 bolts 組成的網絡 打包到 "topology" 中,它是頂級的抽象,是你需要提交到storm集群履行的東西。1個topology是1個由spout和bolt組成的 做流轉換 的圖,其中圖中的每一個節點都可以是1個spout或1個bolt。圖中的邊表明了bolt定閱了哪些流,亦或是當1個spout或bolt發射元組到流中,它發出的元組數據到定閱這個流的所有bolt中去。
topology中節點之間的聯系表明了元組數據是怎樣去傳送的。舉例來講,Spout A 和 Bolt B 相連(A 到 B),Spout A 和 Bolt C 相連(A 到 C),Bolt B 和 Bolt C相連(B 到 C)。每當Spout A發出元組數據時,它會同時發給Bolt B 和 Bolt C。再者,所有Bolt B的輸出元組,也會發給Bolt C。
在topology中的每一個節點都是并行運行的。因此在你的topology中,你可以為每一個節點指定并行運行的數量,然后storm集群將會產生相應數量的線程來履行。
1個topology無窮運行,直到你殺掉它才會停止。Storm將自動地重新分配失敗過的任務。另外,Storm保證不會有數據丟失,即使是機器掛掉,消息被拋棄。
Storm用元組作為它的數據模型。1個元組是1個命名的,有值的,1般由過個字段組成的序列,序列中的每一個字段可以是任何類型的對象。在沙箱以外,Storm提供所有的原始類型,字符串,byte數組作為元組的字段值。如果想用1個其他類型的對象,你需要實現a serializer 接口。
每一個topology節點必須聲明輸出元組的字段。舉例來講,這個bolt聲明它將輸出帶有"double" and "triple"兩個字段的元組:
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
declareOutputFields
方法聲明了該bolt的輸出的字段 ["double",
"triple"]
.這個bolt的剩余部份將在接下來進行說明。
讓我們去看1個簡單的 topology,去探索更多的概念,去看1下它的代碼到底長甚么樣。讓我們看1下 ExclamationTopology
的定義,來自storm-starter的項目:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
這個topology包括了1個spout和兩個bolt,這個spout發出words,然后每一個bolt都在自己的讀入字符串上加上"!!!"。
這些節點被安排成1條線:spout發給第1個bolt,第1個bolt發給第2個bolt。如果spout發出的元組數據是["bob"] 和 ["john"],通過第2個
bolt后,將發出["bob!!!!!!"] 和 ["john!!!!!!"]。
代碼中用 setSpout
和 setBolt
方法定義了節點。這些方法的輸入是
1個用戶指定的id,包括處理邏輯的對象,你希望該節點并行計算的數量。在這個例子中,這個 spout 的id是 "words" ,兩個bolt的id分別為 "exclaim1" 和 "exclaim2"。
包括處理邏輯的spout對象實現了 IRichSpout 接口,bolt對象實現了 IRichBolt 接口。
最后1個參數,是你希望該節點并行計算的數量是多少,這是可選的。它表明了會有多少線程會通過storm集群來履行這個組件(spout或bolt)。
如果你疏忽它,Storm集群會分配單線程給該節點。
setBolt
返回1個 InputDeclarer 對象,它用來定義bolt的輸入。在這里,bolt
"exclaim1" 聲明了它希望通過shuffle分組的方式讀取 spout "words"中的所有元組。同理,Bolt "exclaim2"
聲明了它希望通過shuffle分組的方式讀取 bolt "exclaim1" 所發出的元組數據。
"shuffle 分組" 指元組數據 將會 隨機散布地 從輸入任務 到bolt任務中。在多個組件(spout或bolt)之間,這里有很多數據分組的方式。
這在接下來的章節中會說明。
如果你希望bolt "exclaim2" 從 spout "words" 和 bolt "exclaim1" 讀取所有的元組數據,你需要像下面這樣定義:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
如你所見,輸入的定義可以是鏈式的,bolt可以指定多個源。
讓我們深入了解1下spout和bolt在topology中的實現。Spout負責發送新的數據到topology中。 TestWordSpout
在topology中每隔 100ms
發送了1個隨機的單詞,單詞來自列表["nathan", "mike", "jackson", "golda", "bertels"]。在 TestWordSpout 中的 nextTuple()
的實現細節以下:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
如你所見,這個實現非常簡單明了。
ExclamationBolt
給輸入的字符串追加上 "!!!" 。 讓我們看1下 ExclamationBolt
的完全實現吧:
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
prepare
方法提供了1個 OutputCollector
對象,它用來發出元組數據給下游節點。元組數據可以在任意時間從bolt發出
-- 可以在 prepare
,execute
,
或 cleanup
方法,或
在另外一個線程,異步地發送。 這里的 prepare
實現很簡單,初始化并保存了 OutputCollector
的援用,該援用將在接下來的execute
方法中使用。
execute
方法從該bolt的輸入中接收1個元組數據, ExclamationBolt
對象提取元組中的第1個字段,并追加字符串
"!!!" 。如果你實現的bolt定閱了多個輸入源,你可以用 Tuple 中的 Tuple#getSourceComponent
方法來獲得你當前讀取的這個元組數據來自于哪一個源。
在 execute
方法中,還有1點東西需要說明。
即輸入的元組作為第1個參數 發出 ,然后在最后1行中發出確認消息。這是Storm保證可靠性的API的1部份,它保證數據不會丟失,這在以后的教程會說明。
當1個Bolt將要停止、關閉時,它需要關閉當前打開的資源,此時 cleanup
方法可以被調用。
需要注意的是,這其實不保證這個方法在storm集群中1定會被調用:舉例來講,如果機器上的任務爆發,這就不會調用這個方法。 cleanup
方法打算用于,當你在 local
mode 上運行你的topology(摹擬storm集群的仿真模式), 你能夠啟動和停止很多topology且不會遭受任何資源泄漏的問題。
declareOutputFields
方法聲明 ExclamationBolt
發出1個名稱為 "word"
的帶1個字段的元組。
getComponentConfiguration
方法允許你從很多方面配置這個組件怎樣去運行。更多高級的話題,深入的解釋,請參見
Configuration.
通常像 cleanup
和 getComponentConfiguration
方法,在bolt中其實不是必須去實現的。你可以用1個更加簡潔的方式,通過使用1個提供默許實現的基本類去定義bolt,這或許更加適合1些。 ExclamationBolt
可以通過繼承 BaseRichBolt
,這會更簡單1點,就像這樣:
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
讓我們來看1下如何在本地模式上運行
ExclamationTopology
,看到它工作起來。
Storm有兩種操作模式:本地模式和散布式模式。在本地模式中,Storm通過線程摹擬工作節點并在1個進程中完成履行。本地模式在用于開發和測試topology時是很有用途的。當你在本地模式中運行 storm-starter 項目中的 topology 時,你就可以看到每一個組件發送了甚么信息。你可以獲得更多關于在本地模式上運行topology的信息,請參見 Local mode。
在散布式模式中,Storm操作的是機器集群。當你提交1個topology給master,你也需要提交運行這個topology所必須的代碼。Master將會關注于分發你的代碼并分配worker去運行你的topology。如果worker掛掉,master將會重新分配這些代碼、topology到其他地方。你可以獲得更多關于在散布式模式上運行topology的信息,請參見 Running topologies on a production cluster。
這是在本地模式上運行 ExclamationTopology
的代碼:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
首先,代碼中定義了單進程的偽集群,通過創建 LocalCluster
對象實現。提交topology到虛擬集群,和提交topology到真實的散布式集群是相同的。提交topology使用LocalCluster
的 submitTopology
方法。它需要的參數為
topology的名字,topology的配置,topology本身。
topology的名稱是為了標識topology,以便你以后可以停掉它。1個topology將無窮期運行,直到你停掉它。
topology的配置可以從多個方面調劑topology運行時的形態。這里給出了兩個最為常見的配置:
setNumWorkers
方法來設置)
表明你希望在storm集群中分配多少進程來履行你的topology。每一個在topology中的組件(spout 或 bolt)將會被分配多個線程去履行。線程數的設置是通過組件的 setBolt
和 setSpout
方法。這些線程存在于worker進程中。
每一個worker進程包括了處理1些組件的1些線程,例如,你橫跨集群指定了300個線程處理所有的組件,且指定了50個worker進程。也就是說,每一個工作進程將履行6個線程, 其中的每個可能又屬于不同的組件。調劑topology的性能需要通過調劑每一個組件的并行線程數 和 工作進程中運行的線程數量。setDebug
方法來設置), 當設為true的時候,它將告知Storm打印組件發出的每條信息。這在本地模式測試topology的時候很有用途。但是當你的topology在集群中運行的時候,也許你應當關掉它。這里有很多其他的topology的配置,更多細節請參見 the Javadoc for Config.
學習如何建立自己的開發環境,以便你能用本地模式運行你的topology(比如在eclipse里),請參見 Creating a new Storm project.
流的分組方式告知1個topology,兩個組件是通過怎樣的方