多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > 服務(wù)器 > Spark 定制版:009~Spark Streaming源碼解讀之Receiver在Driver的精妙實(shí)現(xiàn)全生命周期徹底研究和思考

Spark 定制版:009~Spark Streaming源碼解讀之Receiver在Driver的精妙實(shí)現(xiàn)全生命周期徹底研究和思考

來源:程序員人生   發(fā)布時(shí)間:2016-06-04 16:13:30 閱讀次數(shù):3773次

本講內(nèi)容:

a. Receiver啟動(dòng)的方式假想
b. Receiver啟動(dòng)源碼完全分析

注:本講內(nèi)容基于Spark 1.6.1版本(在2016年5月來講是Spark最新版本)講授。

上節(jié)回顧

上1講中,我們給大家具體分析了RDD的物理生成和邏輯生成進(jìn)程,完全明白DStream和RDD之間的關(guān)系,及其內(nèi)部其他有關(guān)類的具體依賴等信息:

a. DStream是RDD的模板,其內(nèi)部generatedRDDs 保存了每一個(gè)BatchDuration時(shí)間生成的RDD對象實(shí)例。DStream的依賴構(gòu)成了RDD依賴關(guān)系,即從后往前計(jì)算時(shí),只要對最后1個(gè)DStream計(jì)算便可。

b. JobGenerator每隔BatchDuration調(diào)用DStreamGraph的generateJobs方法,調(diào)用了ForEachDStream的generateJob方法,其內(nèi)部先調(diào)用父DStream的getOrCompute方法來獲得RDD,然后在進(jìn)行計(jì)算,從后往前推,第1個(gè)DStream是ReceiverInputDStream,其comput方法中從receiverTracker中獲得對應(yīng)時(shí)間段的metadata信息,然后生成BlockRDD對象,并放入到generatedRDDs中

開講

由上幾節(jié)課中我們知道:

a. Spark Streaming的利用程序在處理數(shù)據(jù)時(shí),會(huì)在開始的階段做好接收數(shù)據(jù)的準(zhǔn)備

b. Spark Streaming的利用程序代碼定義DStream時(shí),會(huì)定義1個(gè)或多個(gè)InputDStream;而每一個(gè)InputDStream則分別對應(yīng)有1個(gè)Receiver

結(jié)合源碼的具體類和方法繪制Receiver啟動(dòng)全生命周期主流程圖:

(原圖信息來自http://blog.csdn.net/andyshar/article/details/51476113,感謝作者!)
這里寫圖片描述

我們就從本講的內(nèi)容開始,為大家解析:

那末Receiver啟動(dòng)的方式假想究竟是甚么呢?

Receiver啟動(dòng)的設(shè)計(jì)問題分析:
 
a. Spark Streaming通過Receiver延續(xù)不斷的從外部數(shù)據(jù)源接收數(shù)據(jù),并把數(shù)據(jù)匯報(bào)給Driver端,由此每一個(gè)Batch Durations就能夠根據(jù)匯報(bào)的數(shù)據(jù)生成不同的Job

b. 即有可能在同1個(gè)Executor當(dāng)中啟動(dòng)多個(gè)Receiver,這類情況之下致使負(fù)載不均勻

c. 由于Executor運(yùn)行本身的故障,task 有可能啟動(dòng)失敗,全部job啟動(dòng)就失敗,即receiver啟動(dòng)失敗

d. Receiver屬于Spark Streaming利用程序啟動(dòng)階段,它又是如何設(shè)計(jì),來到達(dá)Receiver始終會(huì)被啟動(dòng)

e. Receivers和InputDStreams又是如何逐一對應(yīng)的,默許情況下1般只有1個(gè)Receiver嗎?

來吧,走進(jìn)源碼1起看個(gè)究竟??!

Receiver啟動(dòng)源碼完全分析:

如何啟動(dòng)Receiver?

a. 從Spark Core的角度來看,Receiver的啟動(dòng)Spark Core其實(shí)不知道, Receiver是通過Job的方式啟動(dòng)的,運(yùn)行在Executor之上的,由task運(yùn)行

b. 1般情況下,只有1個(gè)Receiver,但是可以創(chuàng)建不同的數(shù)據(jù)來源的InputDStream

c. 啟動(dòng)Receiver的時(shí)候,實(shí)際上1個(gè)receiver就是1個(gè)partition,并由1個(gè)Job啟動(dòng),這個(gè)Job里面有RDD的transformations操作和action的操作,隨著定時(shí)器觸發(fā),不斷的產(chǎn)生有數(shù)據(jù)接收,每一個(gè)時(shí)間段中產(chǎn)生的接收數(shù)據(jù)實(shí)際上就是1個(gè)partition

如此,又回到了最初Receiver啟動(dòng)的方式假想中的問題:

a. 如果有多個(gè)InputDStream,那就要啟動(dòng)多個(gè)Receiver,每一個(gè)Receiver也就相當(dāng)于分片partition,那我啟動(dòng)Receiver的時(shí)候理想的情況下是在不同的機(jī)器上啟動(dòng)Receiver,但是Spark Core的角度來看就是利用程序,感覺不到Receiver的特殊性,所以就會(huì)依照正常的Job啟動(dòng)的方式來處理,極有可能在1個(gè)Executor上啟動(dòng)多個(gè)Receiver;這樣的話便可能致使負(fù)載不均衡

b. 有可能啟動(dòng)Receiver失敗,只要集群存在,Receiver就不應(yīng)當(dāng)啟動(dòng)失敗

c. 從運(yùn)行進(jìn)程中看,1個(gè)Reveiver就是1個(gè)partition的話,Reveiver的啟動(dòng)伴隨1個(gè)Task啟動(dòng),如果Task啟動(dòng)失敗,以Task啟動(dòng)的Receiver也會(huì)失敗

由此,可以得出,對Receiver失敗的話,后果是非常嚴(yán)重的,那末在Spark Streaming如何避免這些事的呢?

Spark Streaming源碼分析,在Spark Streaming當(dāng)中就指定以下信息:

a. Spark使用1個(gè)Job啟動(dòng)1個(gè)Receiver.最大程度的保證了負(fù)載均衡

b. Spark Streaming已指定每一個(gè)Receiver運(yùn)行在那些Executor上,在Receiver運(yùn)行之前就指定了運(yùn)行的地方

c. 如果Receiver啟動(dòng)失敗,此時(shí)其實(shí)不是Job失敗,在內(nèi)部會(huì)重新啟動(dòng)Receiver

進(jìn)入到StreamingContext源碼,開啟解密之旅吧!

在StreamingContext的start方法被調(diào)用的時(shí)候,JobScheduler的start方法會(huì)被調(diào)用

scheduler.start()://啟動(dòng)子線程,1方面為了本地初始化工作,另外1方面是不要阻塞主線程

這里寫圖片描述

而在JobScheduler的start方法中ReceiverTracker的start方法被調(diào)用,Receiver就啟動(dòng)了

這里寫圖片描述

ReceiverTracker的start方法啟動(dòng)RPC消息通訊體,為啥呢?由于receiverTracker會(huì)監(jiān)控全部集群中的Receiver,Receiver轉(zhuǎn)過來要向ReceiverTrackerEndpoint匯報(bào)自己的狀態(tài),接收的數(shù)據(jù),包括生命周期等信息

這里寫圖片描述

基于ReceiverInputDStream(是在Driver端)來取得具體的Receivers實(shí)例,然后再把他們散布到Worker節(jié)點(diǎn)上。1個(gè)ReceiverInputDStream只對應(yīng)1個(gè)Receiver

這里寫圖片描述

其中runDummySparkJob()為了確保所有節(jié)點(diǎn)活著,而且避免所有的receivers集中在1個(gè)節(jié)點(diǎn)上

這里寫圖片描述

再回去看ReceiverTracker.launchReceivers()中的getReceiver()

這里寫圖片描述

ReceiverInputDStream的getReceiver()方法返回Receiver對象。 該方法實(shí)際上要在ReceiverInputDStream的子類實(shí)現(xiàn)。

相應(yīng)的,ReceiverInputDStream的子類中必須要實(shí)現(xiàn)這個(gè)getReceiver()方法。ReceiverInputDStream的子類還必須定義自己對應(yīng)的Receiver子類,由于這個(gè)Receiver子類會(huì)在getReceiver()方法中用來創(chuàng)建這個(gè)Receiver子類的對象。

因此,我們需要查看以下ReceiverInputDStream的繼承關(guān)系

這里寫圖片描述

根據(jù)繼承關(guān)系,這里看1下ReceiverInputDStream的子類SocketInputDStream中的getReceiver方法

這里寫圖片描述

SocketInputDStream中還定義了相應(yīng)的Receiver子類SocketReceiver。SocketReceiver類中還必須定義onStart方法

onStart方法會(huì)啟動(dòng)后臺(tái)線程,調(diào)用receive方法

這里寫圖片描述

啟動(dòng)socket開始接收數(shù)據(jù)

這里寫圖片描述

再回到ReceiverTracker.launchReceivers()中,看最后的代碼 endpoint.send(StartAllReceivers(receivers))。這個(gè)代碼給ReceiverTrackerEndpoint對象發(fā)送了StartAllReceivers消息,ReceiverTrackerEndpoint對象接收后所做的處理在ReceiverTrackerEndpoint.receive中。

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

從注釋中可以看到,Spark Streaming指定receiver在哪些Executors上運(yùn)行,而不是基于Spark Core中的Task來指定

Spark使用submitJob的方式啟動(dòng)Receiver,而在利用程序履行的時(shí)候會(huì)有很多Receiver,這個(gè)時(shí)候是啟動(dòng)1個(gè)Receiver呢,還是把所有的Receiver通過這1個(gè)Job啟動(dòng)?

在ReceiverTracker的receive方法中startReceiver方法第1個(gè)參數(shù)就是receiver,從實(shí)現(xiàn)中可以看出for循環(huán)不斷取出receiver,然后調(diào)用startReceiver。由此就能夠得出1個(gè)Job只啟動(dòng)1個(gè)Receiver
  
如果Receiver啟動(dòng)失敗,此時(shí)其實(shí)不會(huì)認(rèn)為是作業(yè)失敗,會(huì)重新發(fā)消息給ReceiverTrackerEndpoint重新啟動(dòng)Receiver,這樣也就確保了Receivers1定會(huì)被啟動(dòng),這樣就不會(huì)像Task啟動(dòng)Receiver的話如果失敗受重試次數(shù)的影響。

ReceiverTracker.startReceiver:

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

當(dāng)Receiver啟動(dòng)失敗的話,就會(huì)觸發(fā)ReceiverTrackEndpoint重新啟動(dòng)1個(gè)Spark Job去啟動(dòng)Receiver

這里寫圖片描述

當(dāng)Receiver關(guān)閉的話,其實(shí)不需要重新啟動(dòng)Spark Job

這里寫圖片描述

回頭再看ReceiverTracker.startReceiver中的代碼supervisor.start()。在子類ReceiverSupervisorImpl中并沒有start方法,因此調(diào)用的是父類ReceiverSupervisor的start方法。

這里寫圖片描述

這里寫圖片描述

這里寫圖片描述

其具體實(shí)現(xiàn)是在子類的ReceiverSupervisorImpl的onStart方法:

這里寫圖片描述

其中的_.start()是BlockGenerator.start:

這里寫圖片描述

回過頭再看ReceiverSupervisor.start中的startReceiver()

這里寫圖片描述

這里寫圖片描述

仍以Receiver的子類SocketReceiver為例說明onStart方法

SocketReceiver.onStart:

這里寫圖片描述

這個(gè)onStart方法開啟了的線程,用于啟動(dòng)socket來接收數(shù)據(jù)。這個(gè)被運(yùn)行的receive()被定義在ReceiverInputDStream的子類SocketInputDStream中

這里寫圖片描述

這里寫圖片描述

備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號DT_Spark
2、Spark大神級專家:王家林
3、新浪微博: http://www.weibo.com/ilovepains

生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 性生活国产 | 国产高清免费视频 | 日本wwww视频| 五月婷婷综合在线 | xxfree性人妖hd | 亚洲国产图片 | 羞羞人成午夜爽爽影院 | 精品欧美 | 久久精品无码一区二区三区 | 国产成人啪精品午夜在线播放 | 乱码亚洲一区二区三区 | babes性欧美高清 | 日本综合在线观看 | 亚洲色大成网站www久久九九 | 亚洲国产一区二区三区精品 | 天天天狠天天透天天制色 | 午夜欧美在线 | 日韩18| 久久www免费人成看片入口 | 国内精品久久影院 | 国产成人乱码一区二区三区在线 | 国产精品v欧美精品v日韩 | 欧美亚洲尤物久久精品 | 中文字幕精品一区二区2021年 | 久久久久久久国产精品视频 | 亚洲码欧美码一区二区三区 | jizz日本免费 | 亚洲欧美在线观看首页 | h毛片| 亚洲福利片 | 黑人一区二区三区中文字幕 | 亚洲视频中文字幕在线 | 天堂福利视频在线观看 | 97麻豆精品国产自产在线观看 | 在线看片一区 | 日韩欧美亚洲国产高清在线 | 欧美九九视频 | 日本在线一区二区 | 亚洲成av人影片在线观看 | 久久久一区二区三区 | 青青草原国产在线视频 |