Spark 入門實(shí)戰(zhàn)之最好的實(shí)例
來(lái)源:程序員人生 發(fā)布時(shí)間:2016-06-12 08:27:30 閱讀次數(shù):3313次
轉(zhuǎn)載:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/
搭建開(kāi)發(fā)環(huán)境
-
安裝 Scala IDE
搭建 Scala 語(yǔ)言開(kāi)發(fā)環(huán)境很容易,Scala IDE 官網(wǎng) 下載適合的版本并解壓就能夠完成安裝,本文使用的版本是
4.1.0。
-
安裝 Scala 語(yǔ)言包
如果下載的 Scala IDE 自帶的 Scala 語(yǔ)言包與 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不1致,那末就需要下載和本文所使用的 Spark 所匹配的版本,以確保實(shí)現(xiàn)的 Scala 程序不會(huì)由于版本問(wèn)題而運(yùn)行失敗。
請(qǐng)下載并安裝 Scala 2.10.5 版本
-
安裝 JDK
如果您的機(jī)器上沒(méi)有安裝 JDK,請(qǐng)下載并安裝 1.6 版本以上的 JDK。
-
創(chuàng)建并配置 Spark 工程
打開(kāi) Scala IDE,創(chuàng)建1個(gè)名稱為 spark-exercise 的 Scala 工程。
圖 1. 創(chuàng)建 scala 工程
在工程目錄下創(chuàng)建1個(gè) lib 文件夾,并且把您的 Spark 安裝包下的 spark-assembly jar 包拷貝到 lib 目錄下。
圖 2. Spark 開(kāi)發(fā) jar 包
并且添加該 jar 包到工程的 classpath 并配置工程使用剛剛安裝的 Scala 2.10.5 版本.,工程目錄結(jié)構(gòu)以下。
圖 3. 添加 jar 包到 classpath
回頁(yè)首
運(yùn)行環(huán)境介紹
為了不讀者對(duì)本文案例運(yùn)行環(huán)境產(chǎn)生困惑,本節(jié)會(huì)對(duì)本文用到的集群環(huán)境的基本情況做個(gè)簡(jiǎn)單介紹。
-
本文所有實(shí)例數(shù)據(jù)存儲(chǔ)的環(huán)境是1個(gè) 8 個(gè)機(jī)器的 Hadoop 集群,文件系統(tǒng)總?cè)萘渴?1.12T,NameNode 叫 hadoop036166, 服務(wù)端口是 9000。讀者可以不關(guān)心具體的節(jié)點(diǎn)散布,由于這個(gè)不會(huì)影響到您瀏覽后面的文章。
-
本文運(yùn)行實(shí)例程序使用的 Spark 集群是1個(gè)包括4個(gè)節(jié)點(diǎn)的 Standalone 模式的集群, 其中包括1個(gè) Master 節(jié)點(diǎn) (監(jiān)聽(tīng)端口 7077) 和3個(gè) Worker 節(jié)點(diǎn),具體散布以下:
Server Name |
Role |
hadoop036166 |
Master |
hadoop036187 |
Worker |
hadoop036188 |
Worker |
hadoop036227 |
Worker |
-
Spark 提供1個(gè) Web UI 去查看集群信息并且監(jiān)控履行結(jié)果,默許地址是:http://<spark_master_ip>:8080 ,對(duì)該實(shí)例提交后我們也能夠到 web 頁(yè)面上去查看履行結(jié)果,固然也能夠通過(guò)查看日志去找到履行結(jié)果。
圖 4. Spark 的 web console
回頁(yè)首
案例分析與編程實(shí)現(xiàn)
案例1
a. 案例描寫
提起 Word Count(詞頻數(shù)統(tǒng)計(jì)),相信大家都不陌生,就是統(tǒng)計(jì)1個(gè)或多個(gè)文件中單詞出現(xiàn)的次數(shù)。本文將此作為1個(gè)入門級(jí)案例,由淺入深的開(kāi)啟使用 Scala 編寫 Spark 大數(shù)據(jù)處理程序的大門。
b.案例分析
對(duì)詞頻數(shù)統(tǒng)計(jì),用 Spark 提供的算子來(lái)實(shí)現(xiàn),我們首先需要將文本文件中的每行轉(zhuǎn)化成1個(gè)個(gè)的單詞, 其次是對(duì)每個(gè)出現(xiàn)的單詞進(jìn)行記1次數(shù),最后就是把所有相同單詞的計(jì)數(shù)相加得到終究的結(jié)果。
對(duì)第1步我們自然的想到使用 flatMap 算子把1行文本 split 成多個(gè)單詞,然后對(duì)第2步我們需要使用 map 算子把單個(gè)的單詞轉(zhuǎn)化成1個(gè)有計(jì)數(shù)的 Key-Value 對(duì),即 word -> (word,1). 對(duì)最后1步統(tǒng)計(jì)相同單詞的出現(xiàn)次數(shù),我們需要使用 reduceByKey 算子把相同單詞的計(jì)數(shù)相加得到終究結(jié)果。
c. 編程實(shí)現(xiàn)
清單 1.SparkWordCount 類源碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCount {
def FILE_NAME:String = "word_count_results_";
def main(args:Array[String]) {
if (args.length < 1) {
println("Usage:SparkWordCount FileName");
System.exit(1);
}
val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
val sc = new SparkContext(conf);
val textFile = sc.textFile(args(0));
val wordCounts = textFile.flatMap(line => line.split(" ")).map(
word => (word, 1)).reduceByKey((a, b) => a + b)
//print the results,for debug use.
//println("Word Count program running results:");
//wordCounts.collect().foreach(e => {
//val (k,v) = e
//println(k+"="+v)
//});
wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
println("Word Count program running results are successfully saved.");
}
}
d. 提交到集群履行
本實(shí)例中, 我們將統(tǒng)計(jì) HDFS 文件系統(tǒng)中/user/fams 目錄下所有 txt 文件中詞頻數(shù)。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,這個(gè) jar 包履行時(shí)會(huì)被上傳到目標(biāo)服務(wù)器的/home/fams 目錄下。運(yùn)行此實(shí)例的具體命令以下:
清單 2.SparkWordCount 類履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt
e. 監(jiān)控履行狀態(tài)
該實(shí)例把終究的結(jié)果存儲(chǔ)在了 HDFS 上,那末如果程序運(yùn)行正常我們可以在 HDFS 上找到生成的文件信息
圖 5. 案例1輸出結(jié)果
打開(kāi) Spark 集群的 Web UI, 可以看到剛才提交的 job 的履行結(jié)果。
圖 6. 案例1完成狀態(tài)
如果程序還沒(méi)運(yùn)行完成,那末我們可以在 Running Applications 列表里找到它。
案例2
a. 案例描寫
該案例中,我們將假定我們需要統(tǒng)計(jì)1個(gè) 1000 萬(wàn)人口的所有人的平均年齡,固然如果您想測(cè)試 Spark 對(duì)大數(shù)據(jù)的處理能力,您可以把人口數(shù)放的更大,比如 1 億人口,固然這個(gè)取決于測(cè)試所用集群的存儲(chǔ)容量。假定這些年齡信息都存儲(chǔ)在1個(gè)文件里,并且該文件的格式以下,第1列是 ID,第2列是年齡。
圖 7. 案例2測(cè)試數(shù)據(jù)格式預(yù)覽
現(xiàn)在我們需要用 Scala 寫1個(gè)生成 1000 萬(wàn)人口年齡數(shù)據(jù)的文件,源程序以下:
清單 3. 年齡信息文件生成類源碼
import java.io.FileWriter
import java.io.File
import scala.util.Random
object SampleDataFileGenerator {
def main(args:Array[String]) {
val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false)
val rand = new Random()
for ( i <- 1 to 10000000) {
writer.write( i + " " + rand.nextInt(100))
writer.write(System.getProperty("line.separator"))
}
writer.flush()
writer.close()
}
}
b. 案例分析
要計(jì)算平均年齡,那末首先需要對(duì)源文件對(duì)應(yīng)的 RDD 進(jìn)行處理,也就是將它轉(zhuǎn)化成1個(gè)只包括年齡信息的 RDD,其次是計(jì)算元素個(gè)數(shù)即為總?cè)藬?shù),然后是把所有年齡數(shù)加起來(lái),最后平均年齡=總年齡/人數(shù)。
對(duì)第1步我們需要使用 map 算子把源文件對(duì)應(yīng)的 RDD 映照成1個(gè)新的只包括年齡數(shù)據(jù)的 RDD,很明顯需要對(duì)在 map 算子的傳入函數(shù)中使用 split 方法,得到數(shù)組后只取第2個(gè)元素即為年齡信息;第2步計(jì)算數(shù)據(jù)元素總數(shù)需要對(duì)第1步映照的結(jié)果 RDD 使用 count 算子;第3步則是使用 reduce 算子對(duì)只包括年齡信息的 RDD 的所有元素用加法求和;最后使用除法計(jì)算平均年齡便可。
由于本例輸出結(jié)果很簡(jiǎn)單,所以只打印在控制臺(tái)便可。
c. 編程實(shí)現(xiàn)
清單 4.AvgAgeCalculator 類源碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:AvgAgeCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val count = dataFile.count()
val ageData = dataFile.map(line => line.split(" ")(1))
val totalAge = ageData.map(age => Integer.parseInt(
String.valueOf(age))).collect().reduce((a,b) => a+b)
println("Total Age:" + totalAge + ";Number of People:" + count )
val avgAge : Double = totalAge.toDouble / count.toDouble
println("Average Age is " + avgAge)
}
}
d. 提交到集群履行
要履行本實(shí)例的程序,需要將剛剛生成的年齡信息文件上傳到 HDFS 上,假定您剛才已在目標(biāo)機(jī)器上履行生成年齡信息文件的 Scala 類,并且文件被生成到了/home/fams 目錄下。
那末您需要運(yùn)行1下 HDFS 命令把文件拷貝到 HDFS 的/user/fams 目錄。
清單 5. 年齡信息文件拷貝到 HDFS 目錄的命令
hdfs dfs –copyFromLocal /home/fams /user/fams
清單 6.AvgAgeCalculator 類的履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.AvgAgeCalculator \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt
e. 監(jiān)控履行狀態(tài)
在控制臺(tái)您可以看到以下所示信息:
圖 8. 案例2輸出結(jié)果
我們也能夠到 Spark Web Console 去查看 Job 的履行狀態(tài)
圖 9. 案例2完成狀態(tài)
案例3
a. 案例描寫
本案例假定我們需要對(duì)某個(gè)省的人口 (1 億) 性別還有身高進(jìn)行統(tǒng)計(jì),需要計(jì)算出男女人數(shù),男性中的最高和最低身高,和女性中的最高和最低身高。本案例中用到的源文件有以下格式, 3列分別是 ID,性別,身高 (cm)。
圖 10. 案例3測(cè)試數(shù)據(jù)格式預(yù)覽
我們將用以下 Scala 程序生成這個(gè)文件,源碼以下:
清單 7. 人口信息生成類源碼
import java.io.FileWriter
import java.io.File
import scala.util.Random
object PeopleInfoFileGenerator {
def main(args:Array[String]) {
val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false)
val rand = new Random()
for ( i <- 1 to 100000000) {
var height = rand.nextInt(220)
if (height < 50) {
height = height + 50
}
var gender = getRandomGender
if (height < 100 && gender == "M")
height = height + 100
if (height < 100 && gender == "F")
height = height + 50
writer.write( i + " " + getRandomGender + " " + height)
writer.write(System.getProperty("line.separator"))
}
writer.flush()
writer.close()
println("People Information File generated successfully.")
}
def getRandomGender() :String = {
val rand = new Random()
val randNum = rand.nextInt(2) + 1
if (randNum % 2 == 0) {
"M"
} else {
"F"
}
}
}
b. 案例分析
對(duì)這個(gè)案例,我們要分別統(tǒng)計(jì)男女的信息,那末很自然的想到首先需要對(duì)男女信息從源文件的對(duì)應(yīng)的 RDD 中進(jìn)行分離,這樣會(huì)產(chǎn)生兩個(gè)新的 RDD,分別包括男女信息;其次是分別對(duì)男女信息對(duì)應(yīng)的 RDD 的數(shù)據(jù)進(jìn)行進(jìn)1步映照,使其只包括身高數(shù)據(jù),這樣我們又得到兩個(gè) RDD,分別對(duì)應(yīng)男性身高和女性身高;最后需要對(duì)這兩個(gè) RDD 進(jìn)行排序,進(jìn)而得到最高和最低的男性或女性身高。
對(duì)第1步,也就是分離男女信息,我們需要使用 filter 算子,過(guò)濾條件就是包括”M” 的行是男性,包括”F”的行是女性;第2步我們需要使用 map 算子把男女各自的身高數(shù)據(jù)從 RDD 中分離出來(lái);第3步我們需要使用 sortBy 算子對(duì)男女身高數(shù)據(jù)進(jìn)行排序。
c. 編程實(shí)現(xiàn)
在實(shí)現(xiàn)上,有1個(gè)需要注意的點(diǎn)是在 RDD 轉(zhuǎn)化的進(jìn)程中需要把身高數(shù)據(jù)轉(zhuǎn)換成整數(shù),否則 sortBy 算子會(huì)把它視為字符串,那末排序結(jié)果就會(huì)遭到影響,例如 身高數(shù)據(jù)如果是:123,110,84,72,100,那末升序排序結(jié)果將會(huì)是 100,110,123,72,84,明顯這是不對(duì)的。
清單 8.PeopleInfoCalculator 類源碼
object PeopleInfoCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:PeopleInfoCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val maleData = dataFile.filter(line => line.contains("M")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
val femaleData = dataFile.filter(line => line.contains("F")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
//for debug use
//maleData.collect().foreach { x => println(x)}
//femaleData.collect().foreach { x => println(x)}
val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
//for debug use
//maleHeightData.collect().foreach { x => println(x)}
//femaleHeightData.collect().foreach { x => println(x)}
val lowestMale = maleHeightData.sortBy(x => x,true).first()
val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
//for debug use
//maleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
//femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
val highestMale = maleHeightData.sortBy(x => x, false).first()
val highestFemale = femaleHeightData.sortBy(x => x, false).first()
println("Number of Male Peole:" + maleData.count())
println("Number of Female Peole:" + femaleData.count())
println("Lowest Male:" + lowestMale)
println("Lowest Female:" + lowestFemale)
println("Highest Male:" + highestMale)
println("Highest Female:" + highestFemale)
}
}
d. 提交到集群履行
在提交該程序到集群履行之前,我們需要將剛才生成的人口信息數(shù)據(jù)文件上傳到 HDFS 集群,具體命令可以參照上文。
清單 9.PeopleInfoCalculator 類的履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.PeopleInfoCalculator \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 3g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt
e. 監(jiān)控履行狀態(tài)
對(duì)該實(shí)例,如程序中打印的1樣,會(huì)在控制臺(tái)顯示以下信息:
圖 11. 案例3輸出結(jié)果
在 Spark Web Console 里可以看到具體的履行狀態(tài)信息
圖 12. 案例3完成狀態(tài)
案例4
a. 案例描寫
該案例中我們假定某搜索引擎公司要統(tǒng)計(jì)過(guò)去1年搜索頻率最高的 K 個(gè)科技關(guān)鍵詞或詞組,為了簡(jiǎn)化問(wèn)題,我們假定關(guān)鍵詞組已被整理到1個(gè)或多個(gè)文本文件中,并且文檔具有以下格式。
圖 13. 案例4測(cè)試數(shù)據(jù)格式預(yù)覽
我們可以看到1個(gè)關(guān)鍵詞或詞組可能出現(xiàn)屢次,并且大小寫格式可能不1致。
b. 案例分析
要解決這個(gè)問(wèn)題,首先我們需要對(duì)每一個(gè)關(guān)鍵詞出現(xiàn)的次數(shù)進(jìn)行計(jì)算,在這個(gè)進(jìn)程中需要辨認(rèn)不同大小寫的相同單詞或詞組,如”Spark”和“spark” 需要被認(rèn)定為1個(gè)單詞。對(duì)出現(xiàn)次數(shù)統(tǒng)計(jì)的進(jìn)程和 word count 案例類似;其次我們需要對(duì)關(guān)鍵詞或詞組依照出現(xiàn)的次數(shù)進(jìn)行降序排序,在排序前需要把 RDD 數(shù)據(jù)元素從 (k,v) 轉(zhuǎn)化成 (v,k);最后取排在最前面的 K 個(gè)單詞或詞組。
對(duì)第1步,我們需要使用 map 算子對(duì)源數(shù)據(jù)對(duì)應(yīng)的 RDD 數(shù)據(jù)進(jìn)行全小寫轉(zhuǎn)化并且給詞組記1次數(shù),然后調(diào)用 reduceByKey 算子計(jì)算相同詞組的出現(xiàn)次數(shù);第2步我們需要對(duì)第1步產(chǎn)生的 RDD 的數(shù)據(jù)元素用 sortByKey 算子進(jìn)行降序排序;第3步再對(duì)排好序的 RDD 數(shù)據(jù)使用 take 算子獲得前 K 個(gè)數(shù)據(jù)元素。
c. 編程實(shí)現(xiàn)
清單 10.TopKSearchKeyWords 類源碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TopKSearchKeyWords {
def main(args:Array[String]){
if (args.length < 2) {
println("Usage:TopKSearchKeyWords KeyWordsFile K");
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
val sc = new SparkContext(conf)
val srcData = sc.textFile(args(0))
val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
//for debug use
//countedData.foreach(x => println(x))
val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
topKData.foreach(println)
}
}
d. 提交到集群履行
清單 11.TopKSearchKeyWords 類的履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.TopKSearchKeyWords \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt
e. 監(jiān)控履行狀態(tài)
如果程序成功履行,我們將在控制臺(tái)看到以下信息。固然讀者也能夠仿照案例2和案例3那樣,自己嘗試使用 Scala 寫1段小程序生成此案例需要的源數(shù)據(jù)文件,可以根據(jù)您的 HDFS 集群的容量,生成盡量大的文件,用來(lái)測(cè)試本案例提供的程序。
圖 14. 案例4輸出結(jié)果
圖 15. 案例4完成狀態(tài)
回頁(yè)首
Spark job 的履行流程簡(jiǎn)介
我們可以發(fā)現(xiàn),Spark 利用程序在提交履行后,控制臺(tái)會(huì)打印很多日志信息,這些信息看起來(lái)是雜亂無(wú)章的,但是卻在1定程度上體現(xiàn)了1個(gè)被提交的 Spark job 在集群中是如何被調(diào)度履行的,那末在這1節(jié),將會(huì)向大家介紹1個(gè)典型的 Spark job 是如何被調(diào)度履行的。
我們先來(lái)了解以下幾個(gè)概念:
DAG: 即 Directed Acyclic Graph,有向無(wú)環(huán)圖,這是1個(gè)圖論中的概念。如果1個(gè)有向圖沒(méi)法從某個(gè)頂點(diǎn)動(dòng)身經(jīng)過(guò)若干條邊回到該點(diǎn),則這個(gè)圖是1個(gè)有向無(wú)環(huán)圖。
Job:我們知道,Spark 的計(jì)算操作是 lazy 履行的,只有當(dāng)碰到1個(gè)動(dòng)作 (Action) 算子時(shí)才會(huì)觸發(fā)真實(shí)的計(jì)算。1個(gè) Job 就是由動(dòng)作算子而產(chǎn)生包括1個(gè)或多個(gè) Stage 的計(jì)算作業(yè)。
Stage:Job 被肯定后,Spark 的調(diào)度器 (DAGScheduler) 會(huì)根據(jù)該計(jì)算作業(yè)的計(jì)算步驟把作業(yè)劃分成1個(gè)或多個(gè) Stage。Stage 又分為 ShuffleMapStage 和 ResultStage,前者以 shuffle 為輸出邊界,后者會(huì)直接輸出結(jié)果,其邊界可以是獲得外部數(shù)據(jù),也能夠是以1個(gè)
ShuffleMapStage 的輸出為邊界。每個(gè) Stage 將包括1個(gè) TaskSet。
TaskSet: 代表1組相干聯(lián)的沒(méi)有 shuffle 依賴關(guān)系的任務(wù)組成任務(wù)集。1組任務(wù)會(huì)被1起提交到更加底層的 TaskScheduler。
Task:代表單個(gè)數(shù)據(jù)分區(qū)上的最小處理單元。分為 ShuffleMapTask 和 ResultTask。ShuffleMapTask 履行任務(wù)并把任務(wù)的輸出劃分到 (基于 task 的對(duì)應(yīng)的數(shù)據(jù)分區(qū)) 多個(gè) bucket(ArrayBuffer) 中,ResultTask 履行任務(wù)并把任務(wù)的輸動(dòng)身送給驅(qū)動(dòng)程序。
Spark 的作業(yè)任務(wù)調(diào)度是復(fù)雜的,需要結(jié)合源碼來(lái)進(jìn)行較為詳實(shí)的分析,但是這已超過(guò)本文的范圍,所以這1節(jié)我們只是對(duì)大致的流程進(jìn)行分析。
Spark 利用程序被提交后,當(dāng)某個(gè)動(dòng)作算子觸發(fā)了計(jì)算操作時(shí),SparkContext 會(huì)向 DAGScheduler 提交1個(gè)作業(yè),接著 DAGScheduler 會(huì)根據(jù) RDD 生成的依賴關(guān)系劃分 Stage,并決定各個(gè) Stage 之間的依賴關(guān)系,Stage 之間的依賴關(guān)系就構(gòu)成了 DAG。Stage 的劃分是以 ShuffleDependency 為根據(jù)的,也就是說(shuō)當(dāng)某個(gè) RDD 的運(yùn)算需要將數(shù)據(jù)進(jìn)行 Shuffle 時(shí),這個(gè)包括了 Shuffle 依賴關(guān)系的 RDD 將被用來(lái)作為輸入信息,進(jìn)而構(gòu)建1個(gè)新的
Stage。我們可以看到用這樣的方式劃分 Stage,能夠保證有依賴關(guān)系的數(shù)據(jù)可以以正確的順序履行。根據(jù)每一個(gè) Stage 所依賴的 RDD 數(shù)據(jù)的 partition 的散布,會(huì)產(chǎn)生出與 partition 數(shù)量相等的 Task,這些 Task 根據(jù) partition 的位置進(jìn)行散布。其次對(duì) finalStage 或是 mapStage 會(huì)產(chǎn)生不同的 Task,最后所有的 Task 會(huì)封裝到 TaskSet 內(nèi)提交到 TaskScheduler 去履行。有興趣的讀者可以通過(guò)瀏覽 DAGScheduler
和 TaskScheduler 的源碼獲得更詳細(xì)的履行流程。
回頁(yè)首
結(jié)束語(yǔ)
通過(guò)本文,相信讀者對(duì)如何使用 Scala 編寫 Spark 利用程序處理大數(shù)據(jù)已有了較為深入的了解。固然在處理實(shí)際問(wèn)題時(shí),情況可能比本文舉得例子復(fù)雜很多,但是解決問(wèn)題的基本思想是1致的。在碰到實(shí)際問(wèn)題的時(shí)候,首先要對(duì)源數(shù)據(jù)結(jié)構(gòu)格式等進(jìn)行分析,然后肯定如何去使用 Spark 提供的算子對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)化,終究根據(jù)實(shí)際需求選擇適合的算子操作數(shù)據(jù)并計(jì)算結(jié)果。本文并未介紹其它 Spark 模塊的知識(shí),明顯這不是1篇文章所能完成的,希望以后會(huì)有機(jī)會(huì)總結(jié)更多的 Spark 利用程序開(kāi)發(fā)和性能調(diào)優(yōu)方面的知識(shí),寫成文章與更多的
Spark 技術(shù)愛(ài)好者分享,1起進(jìn)步。由于時(shí)間倉(cāng)促并且本人知識(shí)水平有限,文章難免有未斟酌周全的地方乃至是毛病,希望各位朋友不吝賜教。有任何問(wèn)題,都可以在文末留下您的評(píng)論,我會(huì)及時(shí)回復(fù)。
生活不易,碼農(nóng)辛苦
如果您覺(jué)得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)