大家好,好久沒沒更新Spark類容了,主要是最近考試比較多。今天先給大家展現1個實戰案例,這個案例是我在今年參加第1屆高校云計算利用
創新大賽時技能賽第4題――莎士比亞文集詞頻統計并行化算加法。PS:感謝師兄輝哥,真大神!
題目是這樣的(這里截圖展現出來):
是要好優化還是需要花費1點心思。在這里停詞表的作用是對在該表中的單詞不予以統計,1般而言停詞表中的單詞是出現頻率比較高的單詞(如the)。這個案例比較簡單,但
有的人的思路多是這樣的:先對莎士比亞文集進行wordcount操作統計出各個單詞的出現頻率,然后對wordcount中的結果過濾掉在停詞表
中出現的單詞,最后找到出現頻率最高的100個便可。這類方式可行,但效力略低。大家知道wordcount包括shuffle操作,shuffle所帶來的IO是spark
性能的瓶頸。我們在寫程序的時候應當盡量的較少shuffle IO,那末如何減少shuffle IO呢,在這里我們可以盡可能減少要參與shuffle操作的數據。
所以,優化的思路是對莎士比亞文集進行單詞分片后就進行過濾操作,過濾掉在停詞表中得單詞,然落后行wordcount操作。這樣1來我們可以
過濾掉大量出現頻率很高的辭匯從而減少了主要shuffle IO??赡苡械耐瑢W會問那這里的filter操作豈不是比上面的思路中filter操作需要處理的單詞書更
多,確切是這樣。但是對性能沒有任何影響,為何這么說?大家知道spark的1個良好的特點就是它的pipeline(血統),我們的處理在每個shuffle
操作之 前都會算作1個同1個stage,在這個satge中的計算都是在最后的action時才進行的,血統就是具有這1良好特性。那末對每個partiton上的
文本進行單詞切割落后行filter操作是否是具有pipeline的特性?是否是這兩個操作就像血液1樣瞬間流過你的血管中的兩個細胞?是否是幾近是同時發
生?是否是沒有任何性能影響?
另外,我們還可以將范圍較小的停詞表放在1個hash表中,hash查找的效力幾近為單位時間(大家1定要多關注hash的原理,頭幾天百度面試包
含了很多hash類容)。
說了這么多,下面貼出源碼:
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import org.apache.spark._
- object Shakespear {
- def main(args: Array[String]) {
- if (args.length != 3) {
- println("USage:<Shakespear> <Stopwords> <Out>")
- }
- //initial SparkConf and SparkContext
- val sc = new SparkContext()
- //To get Shakespear'paper
- val papers = sc.textFile(args(0))
- //To get stopwords
- val stopWords = sc.textFile(args(1)).map(_.trim).collect().toSet + ""
- //To parse papers into words and find the words statisfy the requirement
- val words = papers.flatMap(_.split("[^a-zA-Z]")).map(_.toLowerCase).filter(!stopWords(_)).map((_,1)).
- reduceByKey(_ + _).map(line=>(line._2, line._1)).top(100).map(line=>(line._2, line._1))
- val result = sc.parallelize(words)
- //To write the result into hdfs
- result.saveAsTextFile(args(2))
- }
- }
在后面我會提供包括技能賽第3題和其他的案例詳解。希望大家共同學習討論。(by老楊,轉載請注明出處)
上一篇 scala簡要:包