隨著Spark的逐步成熟完善, 愈來愈多的可配置參數被添加到Spark中來, 本文試圖通過論述這其中部份參數的工作原理和配置思路, 和大家1起探討1下如何根據實際場合對Spark進行配置優化。
由于篇幅較長,所以在這里分篇組織,如果要看最新完全的網頁版內容,可以戳這里:http://spark-config.readthedocs.org/,主要是便于更新內容
schedule調度相干
調度相干的參數設置,大多數內容都很直白,其實不必過量的額外解釋,不過基于這些參數的經常使用性(大概會是你針對自己的集群第1步就會配置的參數),這里多少就其內部機制做1些解釋。
spark.cores.max
1個集群最重要的參數之1,固然就是CPU計算資源的數量。spark.cores.max 這個參數決定了在Standalone和Mesos模式下,1個Spark利用程序所能申請的CPU Core的數量。如果你沒有并發跑多個Spark利用程序的需求,那末可以不需要設置這個參數,默許會使用spark.deploy.defaultCores的值(而spark.deploy.defaultCores的值默許為Int.Max,也就是不限制的意思)從而利用程序可使用所有當前可以取得的CPU資源。
針對這個參數需要注意的是,這個參數對Yarn模式不起作用,YARN模式下,資源由Yarn統1調度管理,1個利用啟動時所申請的CPU資源的數量由另外兩個直接配置Executor的數量和每一個Executor中core數量的參數決定。(歷史緣由造成,不同運行模式下的1些啟動參數個人認為還有待進1步整合)
另外,在Standalone模式等后臺分配CPU資源時,目前的實現中,在spark.cores.max允許的范圍內,基本上是優先從每一個Worker中申請所能得到的最大數量的CPU core給每一個Executor,因此如果人工限制了所申請的Max Core的數量小于Standalone和Mesos模式所管理的CPU數量,可能產生利用只運行在集群中部份節點上的情況(由于部份節點所能提供的最大CPU資源數量已滿足利用的要求),而不是平均散布在集群中。通常這不會是太大的問題,但是如果觸及數據本地性的場合,有可能就會帶來1定的必須進行遠程數據讀取的情況產生。理論上,這個問題可以通過兩種途徑解決:1是Standalone和Mesos的資源管理模塊自動根據節點資源情況,均勻分配和啟動Executor,2是和Yarn模式1樣,允許用戶指定和限制每一個Executor的Core的數量。 社區中有1個PR試圖走第2種途徑來解決類似的問題,不過截至我寫下這篇文檔為止(2014.8),還沒有被Merge。
spark.task.cpus
這個參數在字面上的意思就是分配給每一個任務的CPU的數量,默許為1。實際上,這個參數其實不能真的控制每一個任務實際運行時所使用的CPU的數量,比如你可以通過在任務內部創建額外的工作線程來使用更多的CPU(最少目前為止,將來任務的履行環境是不是能通過LXC等技術來控制還不好說)。它所發揮的作用,只是在作業調度時,每分配出1個任務時,對已使用的CPU資源進行計數。也就是說只是理論上用來統計資源的使用情況,便于安排調度。因此,如果你期望通過修改這個參數來加快任務的運行,那還是趕快換個思路吧。這個參數的意義,個人覺得還是在你真的在任務內部自己通過任何手段,占用了更多的CPU資源時,讓調度行動更加準確的1個輔助手段。
spark.scheduler.mode
這個參數決定了單個Spark利用內部調度的時候使用FIFO模式還是Fair模式。是的,你沒有看錯,這個參數只管理1個Spark利用內部的多個沒有依賴關系的Job作業的調度策略。
如果你需要的是多個Spark利用之間的調度策略,那末在Standalone模式下,這取決于每一個利用所申請和取得的CPU資源的數量(暫時沒有取得資源的利用就Pending在那里了),基本上就是FIFO情勢的,誰先申請和取得資源,誰就占用資源直到完成。而在Yarn模式下,則多個Spark利用間的調度策略由Yarn自己的策略配置文件所決定。
那末這個內部的調度邏輯有甚么用呢?如果你的Spark利用是通過服務的情勢,為多個用戶提交作業的話,那末可以通過配置Fair模式相干參數來調劑不同用戶作業的調度和資源分配優先級。
spark.locality.wait
spark.locality.wait和spark.locality.wait.process,spark.locality.wait.node, spark.locality.wait.rack這幾個參數影響了任務分配時的本地性策略的相干細節。
Spark中任務的處理需要斟酌所觸及的數據的本地性的場合,基本就兩種,1是數據的來源是HadoopRDD; 2是RDD的數據來源來自于RDD Cache(即由CacheManager從BlockManager中讀取,或Streaming數據源RDD)。其它情況下,如果不觸及shuffle操作的RDD,不構成劃分Stage和Task的基準,不存在判斷Locality本地性的問題,而如果是ShuffleRDD,其本地性始終為No Prefer,因此其實也無所謂Locality。
在理想的情況下,任務固然是分配在可以從本地讀取數據的節點上時(同1個JVM內部或同1臺物理機器內部)的運行時性能最好。但是每一個任務的履行速度沒法準確估計,所以很難在事前取得全局最優的履行策略,當Spark利用得到1個計算資源的時候,如果沒有可以滿足最好本地性需求的任務可以運行時,是退而求其次,運行1個本地性條件稍差1點的任務呢,還是繼續等待下1個可用的計算資源已期望它能更好的匹配任務的本地性呢?
這幾個參數1起決定了Spark任務調度在得到分配任務時,選擇暫時不分配任務,而是等待取得滿足進程內部/節點內部/機架內部這樣的不同層次的本地性資源的最長等待時間。默許都是3000毫秒。
基本上,如果你的任務數量較大和單個任務運行時間比較長的情況下,單個任務是不是在數據本地運行,代價區分可能比較顯著,如果數據本地性不理想,那末調大這些參數對性能優化可能會有1定的好處。反之如果等待的代價超過帶來的收益,那就不要斟酌了。
特別值得注意的是:在處理利用剛啟動后提交的第1批任務時,由于當作業調度模塊開始工作時,處理任務的Executors可能還沒有完全注冊終了,因此1部份的任務會被放置到No Prefer的隊列中,這部份任務的優先級僅次于數據本地性滿足Process級別的任務,從而被優先分配到非本地節點履行,如果的確沒有Executors在對應的節點上運行,或的確是No Prefer的任務(如shuffleRDD),這樣做確切是比較優化的選擇,但是這里的實際情況只是這部份Executors還沒來得及注冊上而已。這類情況下,即便加大本節中這幾個參數的數值也沒有幫助。針對這個情況,有1些已完成的和正在進行中的PR通過例如動態調劑No Prefer隊列,監控節點注冊比例等等方式試圖來給出更加智能的解決方案。不過,你也能夠根據本身集群的啟動情況,通過在創建SparkContext以后,主動Sleep幾秒的方式來簡單的解決這個問題。
spark.speculation
spark.speculation和spark.speculation.interval,spark.speculation.quantile, spark.speculation.multiplier等參數調劑Speculation行動的具體細節,Speculation是在任務調度的時候,如果沒有合適當前本地性要求的任務可供運行,將跑得慢的任務在空閑計算資源上再度調度的行動,這些參數調劑這些行動的頻率和判斷指標,默許是不使用Speculation的。
通常來講很難正確的判斷是不是需要Speculation,能真正發揮Speculation用途的場合,常常是某些節點由于運行環境緣由,比如CPU資源由于某種緣由被占用,磁盤破壞致使IO緩慢造成任務履行速度異常的情況,固然條件是你的分區任務不存在僅能被履行1次,或不能同時履行多個拷貝等情況。Speculation任務參照的指標通常是其它任務的履行時間,而實際的任務可能由于分區數據尺寸不均勻,本來就會有時間差異,加上1定的調度和IO的隨機性,所以如果1致性指標定得過嚴,Speculation可能其實不能真的發現問題,反而增加了沒必要要的任務開消,定得過寬,大概又基本相當于沒用。
個人覺得,如果你的集群范圍比較大,運行環境復雜,的確可能常常產生履行異常,加上數據分區尺寸差異不大,為了程序運行時間的穩定性,那末可以斟酌仔細調劑這些參數。否則還是斟酌如何排除造成任務履行速度異常的因數比較靠鋪1些。
固然,我沒有實際在很大范圍的集群上運行過Spark,所以如果看法有些偏頗,還請有實際經驗的XD指正。
上一篇 BZOJ-1192-鬼谷子的錢袋
下一篇 精通正則表達式(2)