多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php框架 > 框架設計 > Apache Flink fault tolerance源碼剖析(二)

Apache Flink fault tolerance源碼剖析(二)

來源:程序員人生   發布時間:2016-06-02 08:14:03 閱讀次數:3277次

繼續Flink Fault Tolerance機制剖析。上1篇文章我們結合代碼講授了Flink中檢查點是如何利用的(如何根據快照做失敗恢復,和檢查點被利用的場景),這篇我們來談談檢查點的觸發機制和基于Actor的消息驅動的協同機制。這篇觸及到1個非常關鍵的類——CheckpointCoordinator

org.apache.flink.runtime.checkpoint.CheckpointCoordinator

該類可以理解為檢查點的調和器,用來調和operatorstate的散布式快照。

周期性的檢查點觸發機制

檢查點的觸發機制是基于定時器的周期性觸發。這觸及到1個定時器的實現類ScheduledTrigger

ScheduledTrigger

觸發檢查點的定時任務類。其實現就是調用triggerCheckpoint方法。這個方法后面會具體介紹。

public void run() { try { triggerCheckpoint(System.currentTimeMillis()); } catch (Exception e) { LOG.error("Exception while triggering checkpoint", e); } }

startCheckpointScheduler

啟動觸發檢查點的定時任務的方法實現:

public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); try { // Multiple start calls are OK checkpointIdCounter.start(); } catch (Exception e) { String msg = "Failed to start checkpoint ID counter: " + e.getMessage(); throw new RuntimeException(msg, e); } periodicScheduling = true; currentPeriodicTrigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); } }

方法的實現包括兩個主要動作:

  • 啟動檢查點ID計數器checkpointIdCounter
  • 啟動觸發檢查點的定時任務

stopCheckpointScheduler

關閉定時任務的方法,用來釋放資源,重置1些標記變量。

triggerCheckpoint

該方法是觸發1個新的檢查點的核心邏輯。

首先,方法中會去判斷1個flag:triggerRequestQueued。該標識表示是不是1個檢查點的觸發要求不能被立即履行。

// sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint while one was queued already"); return false; }

如果不能被立即履行,則直接返回。

不能被立即履行的緣由是:還有其他處理沒有完成。

接著檢查正在并發處理的未完成的檢查點:

// if too many checkpoints are currently in progress, we need to mark that a request is queued if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(); currentPeriodicTrigger = null; } return false; }

如果未完成的檢查點過量,大于允許的并發處理的檢查點數目的閾值,則將當前檢查點的觸發要求設置為不能立即履行,如果定時任務已啟動,則取消定時任務的履行,并返回。

以上這些檢查處于基于鎖機制實現的同步代碼塊中。

接著檢查需要被觸發檢查點的task是不是都處于運行狀態:

ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee != null && ee.getState() == ExecutionState.RUNNING) { triggerIDs[i] = ee.getAttemptId(); } else { LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getSimpleName()); return false; } }

只要有1個task不滿足條件,則不會觸發檢查點,并立即返回。

然后檢查是不是所有需要ack檢查點的task都處于運行狀態:

Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); for (ExecutionVertex ev : tasksToWaitFor) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ackTasks.put(ee.getAttemptId(), ev); } else { LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", ev.getSimpleName()); return false; } }

如果有1個task不滿足條件,則不會觸發檢查點,并立即返回。

以上條件都滿足(即沒有return false;),才具有觸發1個檢查點的基本條件。

下1步,取得checkpointId

final long checkpointID; if (nextCheckpointId < 0) { try { // this must happen outside the locked scope, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); } catch (Throwable t) { int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); return false; } } else { checkpointID = nextCheckpointId; }

這依賴于該方法的另外一個參數nextCheckpointId,如果其值為,則起到標識的作用,唆使checkpointId將從外部獲得(比如Zookeeper,后續文章會談及檢查點ID的生成機制)。

接著創建1個PendingCheckpoint對象:

final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);

該類表示1個待處理的檢查點。

與此同時,會定義1個針對當前檢查點超時進行資源清算的取消器canceller。該取消器主要是針對檢查點沒有釋放資源的情況進行資源釋放操作,同時還會調用triggerQueuedRequests方法啟動1個觸發檢查點的定時任務,如果有的話(取決于triggerRequestQueued是不是為true)。

然后會再次進入同步代碼段,對上面的是不是新建檢查點的判斷條件做2次檢查,避免產生競態條件。這里做2次檢查的緣由是,中間有1段關于取得checkpointId的代碼,不在同步塊中。

檢查后,如果觸發檢查點的條件依然是滿足的,那末將上面創建的PendingCheckpoint對象加入集合中:

pendingCheckpoints.put(checkpointID, checkpoint);

同時會啟動針對當前檢查點的超時取消器:

timer.schedule(canceller, checkpointTimeout);

接下來會發送消息給task以真正觸發檢查點(基于消息驅動的協同機制):

for (int i = 0; i < tasksToTrigger.length; i++) { ExecutionAttemptID id = triggerIDs[i]; TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp); tasksToTrigger[i].sendMessageToCurrentExecution(message, id); }

基于Actor的消息驅動的協同機制

上面已談到了檢查點的觸發機制是基于定時任務的周期性觸發,那末定時任務的啟停機制又是甚么?Flink使用的是基于AKKA的Actor模型的消息驅動機制。

CheckpointCoordinatorDeActivator是1個Actor的實現,它用于基于消息來驅動檢查點的定時任務的啟停:

public void handleMessage(Object message) { if (message instanceof ExecutionGraphMessages.JobStatusChanged) { JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus(); if (status == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } // we ignore all other messages }

Actor會收到Job狀態的變化通知:JobStatusChanged。1旦變成RUNNING,那末檢查點的定時任務會被立即啟動;否則會被立即關閉。

Actor被創建的代碼是CheckpointCoordinator中的createActivatorDeactivator方法:

public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } if (jobStatusListener == null) { Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID); // wrap the ActorRef in a AkkaActorGateway to support message decoration jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID); } return jobStatusListener; } }

既然,是基于消息驅動機制,那末就需要各種類型的消息對應不同的業務邏輯。這些消息在Flink中被定義在package:org.apache.flink.runtime.messages.checkpoint中。

類圖以下:

flink-fault-tolerance-2_message-class-diagram

AbstractCheckpointMessage

檢查點消息的基礎抽象類,提供了3個公共屬性(從構造器注入):

  • job:JobID的實例,表示當前這條消息實例的歸屬;
  • taskExecutionId:ExecutionAttemptID的實例,表示檢查點的源/目的任務
  • checkpointId:當前消息調和的檢查點ID

除此以外,該實現僅僅override了hashCodeequals方法。

TriggerCheckpoint

該消息由JobManager發送給TaskManager,用于告知1個task觸發它的檢查點。

觸發消息

位于CheckpointCoordinator類的triggerCheckpoint中,上面已提及過。

for (int i = 0; i < tasksToTrigger.length; i++) { ExecutionAttemptID id = triggerIDs[i]; TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp); tasksToTrigger[i].sendMessageToCurrentExecution(message, id); }

消息處理

TaskManagerhandleCheckpointingMessage實現:

case message: TriggerCheckpoint => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { task.triggerCheckpointBarrier(checkpointId, timestamp) } else { log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.") }

主要是觸發檢查點屏障Barrier

DeclineCheckpoint

該消息由TaskManager發送給JobManager,用于告知檢查點調和器:檢查點的要求還沒有能夠被處理。這類情況通常產生于:某task已處于RUNNING狀態,但在內部可能還沒有準備好履行檢查點。

它除AbstractCheckpointMessage需要的3個屬性外,還需要用于關聯檢查點的timestamp

觸發消息

位于Task類的triggerCheckpointBarrier方法中:

Runnable runnable = new Runnable() { @Override public void run() { try { boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp); if (!success) { DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp); jobManager.tell(decline); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new RuntimeException( "Error while triggering checkpoint for " + taskName, t)); } } } };

消息處理

位于JobManagerhandleCheckpointMessage

具體的實現在CheckpointCoordinatorreceiveDeclineMessage中:

首先從接收的消息中(DeclineCheckpoint)取得檢查點編號:

final long checkpointId = message.getCheckpointId();

接下來的邏輯是判斷當前檢查點是不是是未完成的檢查點:isPendingCheckpoint

接下來分為3種情況對待:

  • 如果是未完成的檢查點,并且相干資源沒有被釋放(檢查點沒有被discarded
isPendingCheckpoint = true; pendingCheckpoints.remove(checkpointId); checkpoint.discard(userClassLoader); rememberRecentCheckpointId(checkpointId);

isPendingCheckpointtrue,根據檢查點編號,將檢查點從未完成的檢查點集合中移除,discard檢查點,記住最近的檢查點(將其保持到到1個最近的檢查點列表中)。

接下來查找是不是還有待處理的檢查點,根據檢查點時間戳來判斷:

boolean haveMoreRecentPending = false; Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator(); while (entries.hasNext()) { PendingCheckpoint p = entries.next().getValue(); if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { haveMoreRecentPending = true; break; } }

根據標識haveMoreRecentPending來進入不同的處理邏輯:

if (!haveMoreRecentPending && !triggerRequestQueued) { LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); triggerCheckpoint(System.currentTimeMillis()); } else if (!haveMoreRecentPending) { LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); triggerQueuedRequests(); }

如果有需要處理的檢查點,并且當前能立即處理,則立即觸發檢查點定時任務;如果有需要處理的檢查點,但不能立即處理,則觸發入隊的定時任務。

  • 如果是未完成的檢查點,并且檢查點已被discarded

拋出IllegalStateException異常

  • 如果不是未完成的檢查點

如果在最近未完成的檢查點列表中找到,則有可能表示消息來遲了,將isPendingCheckpoint置為true,否則將isPendingCheckpoint置為false.

最后返回isPendingCheckpoint

AcknowledgeCheckpoint

該消息是1個應對信號,表示某個獨立的task的檢查點已完成。也是由TaskManager發送給JobManager。該消息會攜帶task的狀態:

  • state
  • stateSize

觸發消息

RuntimeEnvironment類的acknowledgeCheckpoint方法。

消息處理

具體的實現在CheckpointCoordinatorreceiveAcknowledgeMessage中,開始的實現同receiveDeclineMessage,也是判斷當前接收到的消息中包括的檢查點是不是是待處理的檢查點。如果是,并且也沒有discard掉,則履行以下邏輯:

if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) { if (checkpoint.isFullyAcknowledged()) { completed = checkpoint.toCompletedCheckpoint(); completedCheckpointStore.addCheckpoint(completed); LOG.info("Completed checkpoint " + checkpointId + " (in " + completed.getDuration() + " ms)"); LOG.debug(completed.getStates().toString()); pendingCheckpoints.remove(checkpointId); rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(completed.getTimestamp()); onFullyAcknowledgedCheckpoint(completed); triggerQueuedRequests(); } }

檢查點首先應對相干的task,如果檢查點已完全應對完成,則將檢查點轉換成CompletedCheckpoint,然后將其加入completedCheckpointStore列表,并從pendingCheckpoints中移除。然后調用dropSubsumedCheckpoints它會從pendingCheckpointsdiacard所有時間戳小于當前檢查點的時間戳,并從集合中移除。

最后,如果該檢查點被轉化為已完成的檢查點,則:

if (completed != null) { final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp); ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); } } statsTracker.onCompletedCheckpoint(completed); }

迭代所有待commit的task,發送NotifyCheckpointComplete消息。同時觸發狀態跟蹤器的onCompletedCheckpoint回調方法。

NotifyCheckpointComplete

該消息由JobManager發送給TaskManager,用于告知1個task它的檢查點已得到完成確認,task可以向第3方提交該檢查點。

觸發消息

位于CheckpointCoordinator類的receiveAcknowledgeMessage方法中,當檢查點acktask完成,轉化為CompletedCheckpoint以后

if (completed != null) { final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp); ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); } } statsTracker.onCompletedCheckpoint(completed); }

消息處理

TaskManagerhandleCheckpointingMessage

實現:

case message: NotifyCheckpointComplete => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { task.notifyCheckpointComplete(checkpointId) } else { log.debug( s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.") }

主要是觸發tasknotifyCheckpointComplete方法。

小結

這篇文章主要講授了檢查點的基于定時任務的周期性的觸發機制,和基于Akka的Actor模型的消息驅動的協同處理機制。


微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat


QQ掃碼關注QQ群:Apache Flink學習交換群(123414680)

qrcode_for_apache_flink_qq_group

生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 久久是精品 | 亚洲欧洲在线视频 | 日本aa大片 | 日韩欧美中文字幕出 | 中文字幕精品视频在线观看 | 国产成人一区二区三区精品久久 | 美国一级毛片完整高清 | 欧美成人a视频 | 国产高清自拍 | 国产欧美视频一区二区三区 | 欧美成人a视频 | 五月香婷婷| 欧美乱人伦中文在线观看不卡 | 日韩高清一区二区 | 美国亚洲成年毛片 | 网站国产 | 亚洲免费视频网 | 亚洲综合欧美日韩 | 波多野结衣一二三区 | 99久久精品费精品国产一区二 | 噜噜噜在线观看播放视频 | 毛片免| 色噜噜狠狠先锋影音久久 | 顶级欧美色妇xxxxbbbb | 日韩欧美国产精品第一页不卡 | 日本草久 | 变态 另类 国产 亚洲 | 伊人影院99 | 欧美日韩一区二区三区视频播 | 色去也 | 伊人性| 日韩精品久久一区二区三区 | 色www永久免费 | 在线高清一级欧美精品 | 亚洲精品一区 | 日韩精品在线一区二区 | 日本一区二区三区不卡在线视频 | 欧美精品久久久久久久免费观看 | a一级一色一情 | 亚洲精品区一区二区三区四 | 国产美女视频一区二区二三区 |