本講內(nèi)容:
a. Receiver啟動(dòng)的方式假想
b. Receiver啟動(dòng)源碼完全分析
注:本講內(nèi)容基于Spark 1.6.1版本(在2016年5月來講是Spark最新版本)講授。
上節(jié)回顧
上1講中,我們給大家具體分析了RDD的物理生成和邏輯生成進(jìn)程,完全明白DStream和RDD之間的關(guān)系,及其內(nèi)部其他有關(guān)類的具體依賴等信息:
a. DStream是RDD的模板,其內(nèi)部generatedRDDs 保存了每一個(gè)BatchDuration時(shí)間生成的RDD對象實(shí)例。DStream的依賴構(gòu)成了RDD依賴關(guān)系,即從后往前計(jì)算時(shí),只要對最后1個(gè)DStream計(jì)算便可。
b. JobGenerator每隔BatchDuration調(diào)用DStreamGraph的generateJobs方法,調(diào)用了ForEachDStream的generateJob方法,其內(nèi)部先調(diào)用父DStream的getOrCompute方法來獲得RDD,然后在進(jìn)行計(jì)算,從后往前推,第1個(gè)DStream是ReceiverInputDStream,其comput方法中從receiverTracker中獲得對應(yīng)時(shí)間段的metadata信息,然后生成BlockRDD對象,并放入到generatedRDDs中
開講
由上幾節(jié)課中我們知道:
a. Spark Streaming的利用程序在處理數(shù)據(jù)時(shí),會(huì)在開始的階段做好接收數(shù)據(jù)的準(zhǔn)備
b. Spark Streaming的利用程序代碼定義DStream時(shí),會(huì)定義1個(gè)或多個(gè)InputDStream;而每一個(gè)InputDStream則分別對應(yīng)有1個(gè)Receiver
結(jié)合源碼的具體類和方法繪制Receiver啟動(dòng)全生命周期主流程圖:
(原圖信息來自http://blog.csdn.net/andyshar/article/details/51476113,感謝作者!)
我們就從本講的內(nèi)容開始,為大家解析:
那末Receiver啟動(dòng)的方式假想究竟是甚么呢?
Receiver啟動(dòng)的設(shè)計(jì)問題分析:
a. Spark Streaming通過Receiver延續(xù)不斷的從外部數(shù)據(jù)源接收數(shù)據(jù),并把數(shù)據(jù)匯報(bào)給Driver端,由此每一個(gè)Batch Durations就能夠根據(jù)匯報(bào)的數(shù)據(jù)生成不同的Job
b. 即有可能在同1個(gè)Executor當(dāng)中啟動(dòng)多個(gè)Receiver,這類情況之下致使負(fù)載不均勻
c. 由于Executor運(yùn)行本身的故障,task 有可能啟動(dòng)失敗,全部job啟動(dòng)就失敗,即receiver啟動(dòng)失敗
d. Receiver屬于Spark Streaming利用程序啟動(dòng)階段,它又是如何設(shè)計(jì),來到達(dá)Receiver始終會(huì)被啟動(dòng)
e. Receivers和InputDStreams又是如何逐一對應(yīng)的,默許情況下1般只有1個(gè)Receiver嗎?
來吧,走進(jìn)源碼1起看個(gè)究竟??!
Receiver啟動(dòng)源碼完全分析:
如何啟動(dòng)Receiver?
a. 從Spark Core的角度來看,Receiver的啟動(dòng)Spark Core其實(shí)不知道, Receiver是通過Job的方式啟動(dòng)的,運(yùn)行在Executor之上的,由task運(yùn)行
b. 1般情況下,只有1個(gè)Receiver,但是可以創(chuàng)建不同的數(shù)據(jù)來源的InputDStream
c. 啟動(dòng)Receiver的時(shí)候,實(shí)際上1個(gè)receiver就是1個(gè)partition,并由1個(gè)Job啟動(dòng),這個(gè)Job里面有RDD的transformations操作和action的操作,隨著定時(shí)器觸發(fā),不斷的產(chǎn)生有數(shù)據(jù)接收,每一個(gè)時(shí)間段中產(chǎn)生的接收數(shù)據(jù)實(shí)際上就是1個(gè)partition
如此,又回到了最初Receiver啟動(dòng)的方式假想中的問題:
a. 如果有多個(gè)InputDStream,那就要啟動(dòng)多個(gè)Receiver,每一個(gè)Receiver也就相當(dāng)于分片partition,那我啟動(dòng)Receiver的時(shí)候理想的情況下是在不同的機(jī)器上啟動(dòng)Receiver,但是Spark Core的角度來看就是利用程序,感覺不到Receiver的特殊性,所以就會(huì)依照正常的Job啟動(dòng)的方式來處理,極有可能在1個(gè)Executor上啟動(dòng)多個(gè)Receiver;這樣的話便可能致使負(fù)載不均衡
b. 有可能啟動(dòng)Receiver失敗,只要集群存在,Receiver就不應(yīng)當(dāng)啟動(dòng)失敗
c. 從運(yùn)行進(jìn)程中看,1個(gè)Reveiver就是1個(gè)partition的話,Reveiver的啟動(dòng)伴隨1個(gè)Task啟動(dòng),如果Task啟動(dòng)失敗,以Task啟動(dòng)的Receiver也會(huì)失敗
由此,可以得出,對Receiver失敗的話,后果是非常嚴(yán)重的,那末在Spark Streaming如何避免這些事的呢?
Spark Streaming源碼分析,在Spark Streaming當(dāng)中就指定以下信息:
a. Spark使用1個(gè)Job啟動(dòng)1個(gè)Receiver.最大程度的保證了負(fù)載均衡
b. Spark Streaming已指定每一個(gè)Receiver運(yùn)行在那些Executor上,在Receiver運(yùn)行之前就指定了運(yùn)行的地方
c. 如果Receiver啟動(dòng)失敗,此時(shí)其實(shí)不是Job失敗,在內(nèi)部會(huì)重新啟動(dòng)Receiver
進(jìn)入到StreamingContext源碼,開啟解密之旅吧!
在StreamingContext的start方法被調(diào)用的時(shí)候,JobScheduler的start方法會(huì)被調(diào)用
scheduler.start()://啟動(dòng)子線程,1方面為了本地初始化工作,另外1方面是不要阻塞主線程
而在JobScheduler的start方法中ReceiverTracker的start方法被調(diào)用,Receiver就啟動(dòng)了
ReceiverTracker的start方法啟動(dòng)RPC消息通訊體,為啥呢?由于receiverTracker會(huì)監(jiān)控全部集群中的Receiver,Receiver轉(zhuǎn)過來要向ReceiverTrackerEndpoint匯報(bào)自己的狀態(tài),接收的數(shù)據(jù),包括生命周期等信息
基于ReceiverInputDStream(是在Driver端)來取得具體的Receivers實(shí)例,然后再把他們散布到Worker節(jié)點(diǎn)上。1個(gè)ReceiverInputDStream只對應(yīng)1個(gè)Receiver
其中runDummySparkJob()為了確保所有節(jié)點(diǎn)活著,而且避免所有的receivers集中在1個(gè)節(jié)點(diǎn)上
再回去看ReceiverTracker.launchReceivers()中的getReceiver()
ReceiverInputDStream的getReceiver()方法返回Receiver對象。 該方法實(shí)際上要在ReceiverInputDStream的子類實(shí)現(xiàn)。
相應(yīng)的,ReceiverInputDStream的子類中必須要實(shí)現(xiàn)這個(gè)getReceiver()方法。ReceiverInputDStream的子類還必須定義自己對應(yīng)的Receiver子類,由于這個(gè)Receiver子類會(huì)在getReceiver()方法中用來創(chuàng)建這個(gè)Receiver子類的對象。
因此,我們需要查看以下ReceiverInputDStream的繼承關(guān)系
根據(jù)繼承關(guān)系,這里看1下ReceiverInputDStream的子類SocketInputDStream中的getReceiver方法
SocketInputDStream中還定義了相應(yīng)的Receiver子類SocketReceiver。SocketReceiver類中還必須定義onStart方法
onStart方法會(huì)啟動(dòng)后臺(tái)線程,調(diào)用receive方法
啟動(dòng)socket開始接收數(shù)據(jù)
再回到ReceiverTracker.launchReceivers()中,看最后的代碼 endpoint.send(StartAllReceivers(receivers))。這個(gè)代碼給ReceiverTrackerEndpoint對象發(fā)送了StartAllReceivers消息,ReceiverTrackerEndpoint對象接收后所做的處理在ReceiverTrackerEndpoint.receive中。
從注釋中可以看到,Spark Streaming指定receiver在哪些Executors上運(yùn)行,而不是基于Spark Core中的Task來指定
Spark使用submitJob的方式啟動(dòng)Receiver,而在利用程序履行的時(shí)候會(huì)有很多Receiver,這個(gè)時(shí)候是啟動(dòng)1個(gè)Receiver呢,還是把所有的Receiver通過這1個(gè)Job啟動(dòng)?
在ReceiverTracker的receive方法中startReceiver方法第1個(gè)參數(shù)就是receiver,從實(shí)現(xiàn)中可以看出for循環(huán)不斷取出receiver,然后調(diào)用startReceiver。由此就能夠得出1個(gè)Job只啟動(dòng)1個(gè)Receiver
如果Receiver啟動(dòng)失敗,此時(shí)其實(shí)不會(huì)認(rèn)為是作業(yè)失敗,會(huì)重新發(fā)消息給ReceiverTrackerEndpoint重新啟動(dòng)Receiver,這樣也就確保了Receivers1定會(huì)被啟動(dòng),這樣就不會(huì)像Task啟動(dòng)Receiver的話如果失敗受重試次數(shù)的影響。
ReceiverTracker.startReceiver:
當(dāng)Receiver啟動(dòng)失敗的話,就會(huì)觸發(fā)ReceiverTrackEndpoint重新啟動(dòng)1個(gè)Spark Job去啟動(dòng)Receiver
當(dāng)Receiver關(guān)閉的話,其實(shí)不需要重新啟動(dòng)Spark Job
回頭再看ReceiverTracker.startReceiver中的代碼supervisor.start()。在子類ReceiverSupervisorImpl中并沒有start方法,因此調(diào)用的是父類ReceiverSupervisor的start方法。
其具體實(shí)現(xiàn)是在子類的ReceiverSupervisorImpl的onStart方法:
其中的_.start()是BlockGenerator.start:
回過頭再看ReceiverSupervisor.start中的startReceiver()
仍以Receiver的子類SocketReceiver為例說明onStart方法
SocketReceiver.onStart:
這個(gè)onStart方法開啟了的線程,用于啟動(dòng)socket來接收數(shù)據(jù)。這個(gè)被運(yùn)行的receive()被定義在ReceiverInputDStream的子類SocketInputDStream中
備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
2、Spark大神級專家:王家林
3、新浪微博: http://www.weibo.com/ilovepains