多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國(guó)內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁(yè) > 服務(wù)器 > Spark 入門實(shí)戰(zhàn)之最好的實(shí)例

Spark 入門實(shí)戰(zhàn)之最好的實(shí)例

來(lái)源:程序員人生   發(fā)布時(shí)間:2016-06-12 08:27:30 閱讀次數(shù):3313次

轉(zhuǎn)載:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/

搭建開(kāi)發(fā)環(huán)境

  1. 安裝 Scala IDE

    搭建 Scala 語(yǔ)言開(kāi)發(fā)環(huán)境很容易,Scala IDE 官網(wǎng) 下載適合的版本并解壓就能夠完成安裝,本文使用的版本是 4.1.0。

  2. 安裝 Scala 語(yǔ)言包

    如果下載的 Scala IDE 自帶的 Scala 語(yǔ)言包與 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不1致,那末就需要下載和本文所使用的 Spark 所匹配的版本,以確保實(shí)現(xiàn)的 Scala 程序不會(huì)由于版本問(wèn)題而運(yùn)行失敗。

    請(qǐng)下載并安裝 Scala 2.10.5 版本

  3. 安裝 JDK

    如果您的機(jī)器上沒(méi)有安裝 JDK,請(qǐng)下載并安裝 1.6 版本以上的 JDK。

  4. 創(chuàng)建并配置 Spark 工程

    打開(kāi) Scala IDE,創(chuàng)建1個(gè)名稱為 spark-exercise 的 Scala 工程。

圖 1. 創(chuàng)建 scala 工程
圖 1. 創(chuàng)建 scala 工程

在工程目錄下創(chuàng)建1個(gè) lib 文件夾,并且把您的 Spark 安裝包下的 spark-assembly jar 包拷貝到 lib 目錄下。

圖 2. Spark 開(kāi)發(fā) jar 包
圖 2. Spark 開(kāi)發(fā) jar 包

并且添加該 jar 包到工程的 classpath 并配置工程使用剛剛安裝的 Scala 2.10.5 版本.,工程目錄結(jié)構(gòu)以下。

圖 3. 添加 jar 包到 classpath
圖 3. 添加 jar 包到 classpath

運(yùn)行環(huán)境介紹

為了不讀者對(duì)本文案例運(yùn)行環(huán)境產(chǎn)生困惑,本節(jié)會(huì)對(duì)本文用到的集群環(huán)境的基本情況做個(gè)簡(jiǎn)單介紹。

  • 本文所有實(shí)例數(shù)據(jù)存儲(chǔ)的環(huán)境是1個(gè) 8 個(gè)機(jī)器的 Hadoop 集群,文件系統(tǒng)總?cè)萘渴?1.12T,NameNode 叫 hadoop036166, 服務(wù)端口是 9000。讀者可以不關(guān)心具體的節(jié)點(diǎn)散布,由于這個(gè)不會(huì)影響到您瀏覽后面的文章。
  • 本文運(yùn)行實(shí)例程序使用的 Spark 集群是1個(gè)包括4個(gè)節(jié)點(diǎn)的 Standalone 模式的集群, 其中包括1個(gè) Master 節(jié)點(diǎn) (監(jiān)聽(tīng)端口 7077) 和3個(gè) Worker 節(jié)點(diǎn),具體散布以下:
Server Name Role
hadoop036166 Master
hadoop036187 Worker
hadoop036188 Worker
hadoop036227 Worker
  • Spark 提供1個(gè) Web UI 去查看集群信息并且監(jiān)控履行結(jié)果,默許地址是:http://<spark_master_ip>:8080 ,對(duì)該實(shí)例提交后我們也能夠到 web 頁(yè)面上去查看履行結(jié)果,固然也能夠通過(guò)查看日志去找到履行結(jié)果。
圖 4. Spark 的 web console
圖 4. Spark 的 web console

案例分析與編程實(shí)現(xiàn)

案例1

a. 案例描寫

提起 Word Count(詞頻數(shù)統(tǒng)計(jì)),相信大家都不陌生,就是統(tǒng)計(jì)1個(gè)或多個(gè)文件中單詞出現(xiàn)的次數(shù)。本文將此作為1個(gè)入門級(jí)案例,由淺入深的開(kāi)啟使用 Scala 編寫 Spark 大數(shù)據(jù)處理程序的大門。

b.案例分析

對(duì)詞頻數(shù)統(tǒng)計(jì),用 Spark 提供的算子來(lái)實(shí)現(xiàn),我們首先需要將文本文件中的每行轉(zhuǎn)化成1個(gè)個(gè)的單詞, 其次是對(duì)每個(gè)出現(xiàn)的單詞進(jìn)行記1次數(shù),最后就是把所有相同單詞的計(jì)數(shù)相加得到終究的結(jié)果。

對(duì)第1步我們自然的想到使用 flatMap 算子把1行文本 split 成多個(gè)單詞,然后對(duì)第2步我們需要使用 map 算子把單個(gè)的單詞轉(zhuǎn)化成1個(gè)有計(jì)數(shù)的 Key-Value 對(duì),即 word -> (word,1). 對(duì)最后1步統(tǒng)計(jì)相同單詞的出現(xiàn)次數(shù),我們需要使用 reduceByKey 算子把相同單詞的計(jì)數(shù)相加得到終究結(jié)果。
c. 編程實(shí)現(xiàn)

清單 1.SparkWordCount 類源碼
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SparkWordCount { def FILE_NAME:String = "word_count_results_"; def main(args:Array[String]) { if (args.length < 1) { println("Usage:SparkWordCount FileName"); System.exit(1); } val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program"); val sc = new SparkContext(conf); val textFile = sc.textFile(args(0)); val wordCounts = textFile.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey((a, b) => a + b) //print the results,for debug use. //println("Word Count program running results:"); //wordCounts.collect().foreach(e => { //val (k,v) = e //println(k+"="+v) //}); wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis()); println("Word Count program running results are successfully saved."); } }

d. 提交到集群履行

本實(shí)例中, 我們將統(tǒng)計(jì) HDFS 文件系統(tǒng)中/user/fams 目錄下所有 txt 文件中詞頻數(shù)。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,這個(gè) jar 包履行時(shí)會(huì)被上傳到目標(biāo)服務(wù)器的/home/fams 目錄下。運(yùn)行此實(shí)例的具體命令以下:

清單 2.SparkWordCount 類履行命令
./spark-submit \ --class com.ibm.spark.exercise.basic.SparkWordCount \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/*.txt

e. 監(jiān)控履行狀態(tài)

該實(shí)例把終究的結(jié)果存儲(chǔ)在了 HDFS 上,那末如果程序運(yùn)行正常我們可以在 HDFS 上找到生成的文件信息

圖 5. 案例1輸出結(jié)果
圖 5. 案例一輸出結(jié)果

打開(kāi) Spark 集群的 Web UI, 可以看到剛才提交的 job 的履行結(jié)果。

圖 6. 案例1完成狀態(tài)
圖 6. 案例一完成狀態(tài)

如果程序還沒(méi)運(yùn)行完成,那末我們可以在 Running Applications 列表里找到它。

案例2

a. 案例描寫

該案例中,我們將假定我們需要統(tǒng)計(jì)1個(gè) 1000 萬(wàn)人口的所有人的平均年齡,固然如果您想測(cè)試 Spark 對(duì)大數(shù)據(jù)的處理能力,您可以把人口數(shù)放的更大,比如 1 億人口,固然這個(gè)取決于測(cè)試所用集群的存儲(chǔ)容量。假定這些年齡信息都存儲(chǔ)在1個(gè)文件里,并且該文件的格式以下,第1列是 ID,第2列是年齡。

圖 7. 案例2測(cè)試數(shù)據(jù)格式預(yù)覽
圖 7. 案例二測(cè)試數(shù)據(jù)格式預(yù)覽

現(xiàn)在我們需要用 Scala 寫1個(gè)生成 1000 萬(wàn)人口年齡數(shù)據(jù)的文件,源程序以下:

清單 3. 年齡信息文件生成類源碼
import java.io.FileWriter import java.io.File import scala.util.Random object SampleDataFileGenerator { def main(args:Array[String]) { val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false) val rand = new Random() for ( i <- 1 to 10000000) { writer.write( i + " " + rand.nextInt(100)) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() } }

b. 案例分析

要計(jì)算平均年齡,那末首先需要對(duì)源文件對(duì)應(yīng)的 RDD 進(jìn)行處理,也就是將它轉(zhuǎn)化成1個(gè)只包括年齡信息的 RDD,其次是計(jì)算元素個(gè)數(shù)即為總?cè)藬?shù),然后是把所有年齡數(shù)加起來(lái),最后平均年齡=總年齡/人數(shù)。

對(duì)第1步我們需要使用 map 算子把源文件對(duì)應(yīng)的 RDD 映照成1個(gè)新的只包括年齡數(shù)據(jù)的 RDD,很明顯需要對(duì)在 map 算子的傳入函數(shù)中使用 split 方法,得到數(shù)組后只取第2個(gè)元素即為年齡信息;第2步計(jì)算數(shù)據(jù)元素總數(shù)需要對(duì)第1步映照的結(jié)果 RDD 使用 count 算子;第3步則是使用 reduce 算子對(duì)只包括年齡信息的 RDD 的所有元素用加法求和;最后使用除法計(jì)算平均年齡便可。

由于本例輸出結(jié)果很簡(jiǎn)單,所以只打印在控制臺(tái)便可。

c. 編程實(shí)現(xiàn)

清單 4.AvgAgeCalculator 類源碼
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object AvgAgeCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:AvgAgeCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val count = dataFile.count() val ageData = dataFile.map(line => line.split(" ")(1)) val totalAge = ageData.map(age => Integer.parseInt( String.valueOf(age))).collect().reduce((a,b) => a+b) println("Total Age:" + totalAge + ";Number of People:" + count ) val avgAge : Double = totalAge.toDouble / count.toDouble println("Average Age is " + avgAge) } }

d. 提交到集群履行

要履行本實(shí)例的程序,需要將剛剛生成的年齡信息文件上傳到 HDFS 上,假定您剛才已在目標(biāo)機(jī)器上履行生成年齡信息文件的 Scala 類,并且文件被生成到了/home/fams 目錄下。

那末您需要運(yùn)行1下 HDFS 命令把文件拷貝到 HDFS 的/user/fams 目錄。

清單 5. 年齡信息文件拷貝到 HDFS 目錄的命令
hdfs dfs –copyFromLocal /home/fams /user/fams
清單 6.AvgAgeCalculator 類的履行命令
./spark-submit \ --class com.ibm.spark.exercise.basic.AvgAgeCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt

e. 監(jiān)控履行狀態(tài)

在控制臺(tái)您可以看到以下所示信息:

圖 8. 案例2輸出結(jié)果
圖 8. 案例二輸出結(jié)果

我們也能夠到 Spark Web Console 去查看 Job 的履行狀態(tài)

圖 9. 案例2完成狀態(tài)
圖 9. 案例二完成狀態(tài)

案例3

a. 案例描寫

本案例假定我們需要對(duì)某個(gè)省的人口 (1 億) 性別還有身高進(jìn)行統(tǒng)計(jì),需要計(jì)算出男女人數(shù),男性中的最高和最低身高,和女性中的最高和最低身高。本案例中用到的源文件有以下格式, 3列分別是 ID,性別,身高 (cm)。

圖 10. 案例3測(cè)試數(shù)據(jù)格式預(yù)覽
圖 10. 案例三測(cè)試數(shù)據(jù)格式預(yù)覽

我們將用以下 Scala 程序生成這個(gè)文件,源碼以下:

清單 7. 人口信息生成類源碼
import java.io.FileWriter import java.io.File import scala.util.Random object PeopleInfoFileGenerator { def main(args:Array[String]) { val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false) val rand = new Random() for ( i <- 1 to 100000000) { var height = rand.nextInt(220) if (height < 50) { height = height + 50 } var gender = getRandomGender if (height < 100 && gender == "M") height = height + 100 if (height < 100 && gender == "F") height = height + 50 writer.write( i + " " + getRandomGender + " " + height) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close() println("People Information File generated successfully.") } def getRandomGender() :String = { val rand = new Random() val randNum = rand.nextInt(2) + 1 if (randNum % 2 == 0) { "M" } else { "F" } } }

b. 案例分析

對(duì)這個(gè)案例,我們要分別統(tǒng)計(jì)男女的信息,那末很自然的想到首先需要對(duì)男女信息從源文件的對(duì)應(yīng)的 RDD 中進(jìn)行分離,這樣會(huì)產(chǎn)生兩個(gè)新的 RDD,分別包括男女信息;其次是分別對(duì)男女信息對(duì)應(yīng)的 RDD 的數(shù)據(jù)進(jìn)行進(jìn)1步映照,使其只包括身高數(shù)據(jù),這樣我們又得到兩個(gè) RDD,分別對(duì)應(yīng)男性身高和女性身高;最后需要對(duì)這兩個(gè) RDD 進(jìn)行排序,進(jìn)而得到最高和最低的男性或女性身高。

對(duì)第1步,也就是分離男女信息,我們需要使用 filter 算子,過(guò)濾條件就是包括”M” 的行是男性,包括”F”的行是女性;第2步我們需要使用 map 算子把男女各自的身高數(shù)據(jù)從 RDD 中分離出來(lái);第3步我們需要使用 sortBy 算子對(duì)男女身高數(shù)據(jù)進(jìn)行排序。

c. 編程實(shí)現(xiàn)

在實(shí)現(xiàn)上,有1個(gè)需要注意的點(diǎn)是在 RDD 轉(zhuǎn)化的進(jìn)程中需要把身高數(shù)據(jù)轉(zhuǎn)換成整數(shù),否則 sortBy 算子會(huì)把它視為字符串,那末排序結(jié)果就會(huì)遭到影響,例如 身高數(shù)據(jù)如果是:123,110,84,72,100,那末升序排序結(jié)果將會(huì)是 100,110,123,72,84,明顯這是不對(duì)的。

清單 8.PeopleInfoCalculator 類源碼
object PeopleInfoCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:PeopleInfoCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val maleData = dataFile.filter(line => line.contains("M")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) val femaleData = dataFile.filter(line => line.contains("F")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) //for debug use //maleData.collect().foreach { x => println(x)} //femaleData.collect().foreach { x => println(x)} val maleHeightData = maleData.map(line => line.split(" ")(1).toInt) val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt) //for debug use //maleHeightData.collect().foreach { x => println(x)} //femaleHeightData.collect().foreach { x => println(x)} val lowestMale = maleHeightData.sortBy(x => x,true).first() val lowestFemale = femaleHeightData.sortBy(x => x,true).first() //for debug use //maleHeightData.collect().sortBy(x => x).foreach { x => println(x)} //femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)} val highestMale = maleHeightData.sortBy(x => x, false).first() val highestFemale = femaleHeightData.sortBy(x => x, false).first() println("Number of Male Peole:" + maleData.count()) println("Number of Female Peole:" + femaleData.count()) println("Lowest Male:" + lowestMale) println("Lowest Female:" + lowestFemale) println("Highest Male:" + highestMale) println("Highest Female:" + highestFemale) } }

d. 提交到集群履行

在提交該程序到集群履行之前,我們需要將剛才生成的人口信息數(shù)據(jù)文件上傳到 HDFS 集群,具體命令可以參照上文。

清單 9.PeopleInfoCalculator 類的履行命令
./spark-submit \ --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 3g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt

e. 監(jiān)控履行狀態(tài)

對(duì)該實(shí)例,如程序中打印的1樣,會(huì)在控制臺(tái)顯示以下信息:

圖 11. 案例3輸出結(jié)果
圖 11. 案例三輸出結(jié)果

在 Spark Web Console 里可以看到具體的履行狀態(tài)信息

圖 12. 案例3完成狀態(tài)
圖 12. 案例三完成狀態(tài)

案例4

a. 案例描寫

該案例中我們假定某搜索引擎公司要統(tǒng)計(jì)過(guò)去1年搜索頻率最高的 K 個(gè)科技關(guān)鍵詞或詞組,為了簡(jiǎn)化問(wèn)題,我們假定關(guān)鍵詞組已被整理到1個(gè)或多個(gè)文本文件中,并且文檔具有以下格式。

圖 13. 案例4測(cè)試數(shù)據(jù)格式預(yù)覽
圖 13. 案例四測(cè)試數(shù)據(jù)格式預(yù)覽

我們可以看到1個(gè)關(guān)鍵詞或詞組可能出現(xiàn)屢次,并且大小寫格式可能不1致。

b. 案例分析

要解決這個(gè)問(wèn)題,首先我們需要對(duì)每一個(gè)關(guān)鍵詞出現(xiàn)的次數(shù)進(jìn)行計(jì)算,在這個(gè)進(jìn)程中需要辨認(rèn)不同大小寫的相同單詞或詞組,如”Spark”和“spark” 需要被認(rèn)定為1個(gè)單詞。對(duì)出現(xiàn)次數(shù)統(tǒng)計(jì)的進(jìn)程和 word count 案例類似;其次我們需要對(duì)關(guān)鍵詞或詞組依照出現(xiàn)的次數(shù)進(jìn)行降序排序,在排序前需要把 RDD 數(shù)據(jù)元素從 (k,v) 轉(zhuǎn)化成 (v,k);最后取排在最前面的 K 個(gè)單詞或詞組。

對(duì)第1步,我們需要使用 map 算子對(duì)源數(shù)據(jù)對(duì)應(yīng)的 RDD 數(shù)據(jù)進(jìn)行全小寫轉(zhuǎn)化并且給詞組記1次數(shù),然后調(diào)用 reduceByKey 算子計(jì)算相同詞組的出現(xiàn)次數(shù);第2步我們需要對(duì)第1步產(chǎn)生的 RDD 的數(shù)據(jù)元素用 sortByKey 算子進(jìn)行降序排序;第3步再對(duì)排好序的 RDD 數(shù)據(jù)使用 take 算子獲得前 K 個(gè)數(shù)據(jù)元素。

c. 編程實(shí)現(xiàn)

清單 10.TopKSearchKeyWords 類源碼
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object TopKSearchKeyWords { def main(args:Array[String]){ if (args.length < 2) { println("Usage:TopKSearchKeyWords KeyWordsFile K"); System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words") val sc = new SparkContext(conf) val srcData = sc.textFile(args(0)) val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b) //for debug use //countedData.foreach(x => println(x)) val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false) val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) } topKData.foreach(println) } }

d. 提交到集群履行

清單 11.TopKSearchKeyWords 類的履行命令
./spark-submit \ --class com.ibm.spark.exercise.basic.TopKSearchKeyWords \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt

e. 監(jiān)控履行狀態(tài)

如果程序成功履行,我們將在控制臺(tái)看到以下信息。固然讀者也能夠仿照案例2和案例3那樣,自己嘗試使用 Scala 寫1段小程序生成此案例需要的源數(shù)據(jù)文件,可以根據(jù)您的 HDFS 集群的容量,生成盡量大的文件,用來(lái)測(cè)試本案例提供的程序。

圖 14. 案例4輸出結(jié)果
圖 14. 案例四輸出結(jié)果
圖 15. 案例4完成狀態(tài)
圖 15. 案例四完成狀態(tài)

Spark job 的履行流程簡(jiǎn)介

我們可以發(fā)現(xiàn),Spark 利用程序在提交履行后,控制臺(tái)會(huì)打印很多日志信息,這些信息看起來(lái)是雜亂無(wú)章的,但是卻在1定程度上體現(xiàn)了1個(gè)被提交的 Spark job 在集群中是如何被調(diào)度履行的,那末在這1節(jié),將會(huì)向大家介紹1個(gè)典型的 Spark job 是如何被調(diào)度履行的。

我們先來(lái)了解以下幾個(gè)概念:

DAG: 即 Directed Acyclic Graph,有向無(wú)環(huán)圖,這是1個(gè)圖論中的概念。如果1個(gè)有向圖沒(méi)法從某個(gè)頂點(diǎn)動(dòng)身經(jīng)過(guò)若干條邊回到該點(diǎn),則這個(gè)圖是1個(gè)有向無(wú)環(huán)圖。

Job:我們知道,Spark 的計(jì)算操作是 lazy 履行的,只有當(dāng)碰到1個(gè)動(dòng)作 (Action) 算子時(shí)才會(huì)觸發(fā)真實(shí)的計(jì)算。1個(gè) Job 就是由動(dòng)作算子而產(chǎn)生包括1個(gè)或多個(gè) Stage 的計(jì)算作業(yè)。

Stage:Job 被肯定后,Spark 的調(diào)度器 (DAGScheduler) 會(huì)根據(jù)該計(jì)算作業(yè)的計(jì)算步驟把作業(yè)劃分成1個(gè)或多個(gè) Stage。Stage 又分為 ShuffleMapStage 和 ResultStage,前者以 shuffle 為輸出邊界,后者會(huì)直接輸出結(jié)果,其邊界可以是獲得外部數(shù)據(jù),也能夠是以1個(gè) ShuffleMapStage 的輸出為邊界。每個(gè) Stage 將包括1個(gè) TaskSet。

TaskSet: 代表1組相干聯(lián)的沒(méi)有 shuffle 依賴關(guān)系的任務(wù)組成任務(wù)集。1組任務(wù)會(huì)被1起提交到更加底層的 TaskScheduler。

Task:代表單個(gè)數(shù)據(jù)分區(qū)上的最小處理單元。分為 ShuffleMapTask 和 ResultTask。ShuffleMapTask 履行任務(wù)并把任務(wù)的輸出劃分到 (基于 task 的對(duì)應(yīng)的數(shù)據(jù)分區(qū)) 多個(gè) bucket(ArrayBuffer) 中,ResultTask 履行任務(wù)并把任務(wù)的輸動(dòng)身送給驅(qū)動(dòng)程序。

Spark 的作業(yè)任務(wù)調(diào)度是復(fù)雜的,需要結(jié)合源碼來(lái)進(jìn)行較為詳實(shí)的分析,但是這已超過(guò)本文的范圍,所以這1節(jié)我們只是對(duì)大致的流程進(jìn)行分析。

Spark 利用程序被提交后,當(dāng)某個(gè)動(dòng)作算子觸發(fā)了計(jì)算操作時(shí),SparkContext 會(huì)向 DAGScheduler 提交1個(gè)作業(yè),接著 DAGScheduler 會(huì)根據(jù) RDD 生成的依賴關(guān)系劃分 Stage,并決定各個(gè) Stage 之間的依賴關(guān)系,Stage 之間的依賴關(guān)系就構(gòu)成了 DAG。Stage 的劃分是以 ShuffleDependency 為根據(jù)的,也就是說(shuō)當(dāng)某個(gè) RDD 的運(yùn)算需要將數(shù)據(jù)進(jìn)行 Shuffle 時(shí),這個(gè)包括了 Shuffle 依賴關(guān)系的 RDD 將被用來(lái)作為輸入信息,進(jìn)而構(gòu)建1個(gè)新的 Stage。我們可以看到用這樣的方式劃分 Stage,能夠保證有依賴關(guān)系的數(shù)據(jù)可以以正確的順序履行。根據(jù)每一個(gè) Stage 所依賴的 RDD 數(shù)據(jù)的 partition 的散布,會(huì)產(chǎn)生出與 partition 數(shù)量相等的 Task,這些 Task 根據(jù) partition 的位置進(jìn)行散布。其次對(duì) finalStage 或是 mapStage 會(huì)產(chǎn)生不同的 Task,最后所有的 Task 會(huì)封裝到 TaskSet 內(nèi)提交到 TaskScheduler 去履行。有興趣的讀者可以通過(guò)瀏覽 DAGScheduler 和 TaskScheduler 的源碼獲得更詳細(xì)的履行流程。

結(jié)束語(yǔ)

通過(guò)本文,相信讀者對(duì)如何使用 Scala 編寫 Spark 利用程序處理大數(shù)據(jù)已有了較為深入的了解。固然在處理實(shí)際問(wèn)題時(shí),情況可能比本文舉得例子復(fù)雜很多,但是解決問(wèn)題的基本思想是1致的。在碰到實(shí)際問(wèn)題的時(shí)候,首先要對(duì)源數(shù)據(jù)結(jié)構(gòu)格式等進(jìn)行分析,然后肯定如何去使用 Spark 提供的算子對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)化,終究根據(jù)實(shí)際需求選擇適合的算子操作數(shù)據(jù)并計(jì)算結(jié)果。本文并未介紹其它 Spark 模塊的知識(shí),明顯這不是1篇文章所能完成的,希望以后會(huì)有機(jī)會(huì)總結(jié)更多的 Spark 利用程序開(kāi)發(fā)和性能調(diào)優(yōu)方面的知識(shí),寫成文章與更多的 Spark 技術(shù)愛(ài)好者分享,1起進(jìn)步。由于時(shí)間倉(cāng)促并且本人知識(shí)水平有限,文章難免有未斟酌周全的地方乃至是毛病,希望各位朋友不吝賜教。有任何問(wèn)題,都可以在文末留下您的評(píng)論,我會(huì)及時(shí)回復(fù)。

生活不易,碼農(nóng)辛苦
如果您覺(jué)得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 亚洲日本在线观看网址 | 在线碰碰视频在线观看 | 欧美孕妇乱大交xxxx | 午夜理伦三级理论三级60 | 樱花aⅴ一区二区三区四区 影视精品网站入口 | 欧美中日韩在线 | 免费jizz在在线播放国产 | 国产精品久久亚洲一区二区 | 成人香蕉xxxxxxx | 国产精品亚欧美一区二区三区 | 老司机午夜免费 | 国产女人在线视频 | 日韩精品一区二区三区中文在线 | 午夜视频网 | 国产精品亚洲综合网站 | 成人午夜精品久久久久久久小说 | 动漫精品一级毛片动漫 | 久久99精品一级毛片 | 香蕉大成网人站在线 | 欧美成人毛片一级在线 | 国产女人伦码一区二区三区不卡 | 欧美在线一区二区三区不卡 | 伊人网视频在线 | 成人国产精品毛片 | 中文字幕一区视频 | 视频二区中文字幕 | 亚洲综合欧美日韩 | 国产精品嫩草影院在线看 | 国产高清自拍 | 无限国产资源 | 亚洲精品嫩草研究院久久 | 成人欧美一区二区三区视频xxx | 国产日韩一区二区三区在线观看 | 最近最新中文字幕大全手机在线 | 国产第1页| 好吊日在线观看 | 最近中文字幕免费完整国语 | 日韩欧美中文字幕一区二区三区 | 国产乱码精品一区二区三上 | 国内精品福利 | 欧美亚洲国产成人精品 |