為何需要動態?
a) Spark默許情況下粗粒度的,先分配好資源再計算。對Spark Streaming而言有高峰值和低峰值,但是他們需要的資源是不1樣的,如果依照高峰值的角度的話,就會有大量的資源浪費。
b) Spark Streaming不斷的運行,對資源消耗和管理也是我們要斟酌的因素。
Spark Streaming資源動態調劑的時候會面臨挑戰:
Spark Streaming是依照Batch Duration運行的,Batch Duration需要很多資源,下1次Batch Duration就不需要那末多資源了,調劑資源的時候還沒調劑完Batch Duration運行就已過期了。這個時候調劑時間間隔。
Spark Streaming資源動態申請
1. 在SparkContext中默許是不開啟動態資源分配的,但是可以通過手動在SparkConf中配置。
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled &&
//參數配置是不是開啟資源動態分配
_conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
}
_executorAllocationManager.foreach(_.start())
2. ExecutorAllocationManager: 有定時器會不斷的去掃描Executor的情況,正在運行的Stage,要運行在不同的Executor中,要末增加Executor或減少。
3. ExecutorAllocationManager中schedule方法會被周期性觸發進行資源動態調劑。
/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
}
!expired
}
}
4. 在ExecutorAllocationManager中會在線程池中定時器會不斷的運行schedule.
/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
// intervalMillis定時器觸發時間
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
動態控制消費速率:
Spark Streaming提供了1種彈性機制,流進來的速度和處理速度的關系,是不是來得及處理數據。如果不能來得及的話,他會自動動態控制數據流進來的速度,spark.streaming.backpressure.enabled參數設置。
本課程筆記來源于:
上一篇 [置頂] 成為C++高手之頭文件
下一篇 MP3歌詞的同步與拖拽設計