繼續Flink Fault Tolerance機制剖析。上1篇文章我們結合代碼講授了Flink中檢查點是如何利用的(如何根據快照做失敗恢復,和檢查點被利用的場景),這篇我們來談談檢查點的觸發機制和基于Actor
的消息驅動的協同機制。這篇觸及到1個非常關鍵的類——CheckpointCoordinator
。
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
該類可以理解為檢查點的調和器,用來調和operator
和state
的散布式快照。
檢查點的觸發機制是基于定時器的周期性觸發。這觸及到1個定時器的實現類ScheduledTrigger
觸發檢查點的定時任務類。其實現就是調用triggerCheckpoint
方法。這個方法后面會具體介紹。
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis());
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint", e);
}
}
啟動觸發檢查點的定時任務的方法實現:
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
try {
// Multiple start calls are OK
checkpointIdCounter.start();
} catch (Exception e) {
String msg = "Failed to start checkpoint ID counter: " + e.getMessage();
throw new RuntimeException(msg, e);
}
periodicScheduling = true;
currentPeriodicTrigger = new ScheduledTrigger();
timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
}
}
方法的實現包括兩個主要動作:
checkpointIdCounter
關閉定時任務的方法,用來釋放資源,重置1些標記變量。
該方法是觸發1個新的檢查點的核心邏輯。
首先,方法中會去判斷1個flag:triggerRequestQueued
。該標識表示是不是1個檢查點的觸發要求不能被立即履行。
// sanity check: there should never be more than one trigger request queued
if (triggerRequestQueued) {
LOG.warn("Trying to trigger another checkpoint while one was queued already");
return false;
}
如果不能被立即履行,則直接返回。
不能被立即履行的緣由是:還有其他處理沒有完成。
接著檢查正在并發處理的未完成的檢查點:
// if too many checkpoints are currently in progress, we need to mark that a request is queued
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
currentPeriodicTrigger.cancel();
currentPeriodicTrigger = null;
}
return false;
}
如果未完成的檢查點過量,大于允許的并發處理的檢查點數目的閾值,則將當前檢查點的觸發要求設置為不能立即履行,如果定時任務已啟動,則取消定時任務的履行,并返回。
以上這些檢查處于基于鎖機制實現的同步代碼塊中。
接著檢查需要被觸發檢查點的task
是不是都處于運行狀態:
ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee != null && ee.getState() == ExecutionState.RUNNING) {
triggerIDs[i] = ee.getAttemptId();
} else {
LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getSimpleName());
return false;
}
}
只要有1個task
不滿足條件,則不會觸發檢查點,并立即返回。
然后檢查是不是所有需要ack檢查點的task
都處于運行狀態:
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
ev.getSimpleName());
return false;
}
}
如果有1個task
不滿足條件,則不會觸發檢查點,并立即返回。
以上條件都滿足(即沒有return false;
),才具有觸發1個檢查點的基本條件。
下1步,取得checkpointId
:
final long checkpointID;
if (nextCheckpointId < 0) {
try {
// this must happen outside the locked scope, because it communicates
// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
}
catch (Throwable t) {
int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
return false;
}
}
else {
checkpointID = nextCheckpointId;
}
這依賴于該方法的另外一個參數nextCheckpointId
,如果其值為⑴
,則起到標識的作用,唆使checkpointId
將從外部獲得(比如Zookeeper
,后續文章會談及檢查點ID的生成機制)。
接著創建1個PendingCheckpoint
對象:
final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
該類表示1個待處理的檢查點。
與此同時,會定義1個針對當前檢查點超時進行資源清算的取消器canceller
。該取消器主要是針對檢查點沒有釋放資源的情況進行資源釋放操作,同時還會調用triggerQueuedRequests
方法啟動1個觸發檢查點的定時任務,如果有的話(取決于triggerRequestQueued
是不是為true)。
然后會再次進入同步代碼段,對上面的是不是新建檢查點的判斷條件做2次檢查,避免產生競態條件。這里做2次檢查的緣由是,中間有1段關于取得checkpointId
的代碼,不在同步塊中。
檢查后,如果觸發檢查點的條件依然是滿足的,那末將上面創建的PendingCheckpoint
對象加入集合中:
pendingCheckpoints.put(checkpointID, checkpoint);
同時會啟動針對當前檢查點的超時取消器:
timer.schedule(canceller, checkpointTimeout);
接下來會發送消息給task
以真正觸發檢查點(基于消息驅動的協同機制):
for (int i = 0; i < tasksToTrigger.length; i++) {
ExecutionAttemptID id = triggerIDs[i];
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}
上面已談到了檢查點的觸發機制是基于定時任務的周期性觸發,那末定時任務的啟停機制又是甚么?Flink使用的是基于AKKA的Actor模型的消息驅動機制。
類CheckpointCoordinatorDeActivator
是1個Actor
的實現,它用于基于消息來驅動檢查點的定時任務的啟停:
public void handleMessage(Object message) {
if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
if (status == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
// we ignore all other messages
}
該Actor
會收到Job
狀態的變化通知:JobStatusChanged
。1旦變成RUNNING
,那末檢查點的定時任務會被立即啟動;否則會被立即關閉。
該Actor
被創建的代碼是CheckpointCoordinator
中的createActivatorDeactivator
方法:
public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
if (jobStatusListener == null) {
Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
// wrap the ActorRef in a AkkaActorGateway to support message decoration
jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
}
return jobStatusListener;
}
}
既然,是基于消息驅動機制,那末就需要各種類型的消息對應不同的業務邏輯。這些消息在Flink中被定義在package:org.apache.flink.runtime.messages.checkpoint
中。
類圖以下:
檢查點消息的基礎抽象類,提供了3個公共屬性(從構造器注入):
JobID
的實例,表示當前這條消息實例的歸屬;ExecutionAttemptID
的實例,表示檢查點的源/目的任務除此以外,該實現僅僅override了hashCode
和equals
方法。
該消息由JobManager
發送給TaskManager
,用于告知1個task
觸發它的檢查點。
位于CheckpointCoordinator
類的triggerCheckpoint
中,上面已提及過。
for (int i = 0; i < tasksToTrigger.length; i++) {
ExecutionAttemptID id = triggerIDs[i];
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}
TaskManager
的handleCheckpointingMessage
實現:
case message: TriggerCheckpoint =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, timestamp)
} else {
log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
}
主要是觸發檢查點屏障Barrier
。
該消息由TaskManager
發送給JobManager
,用于告知檢查點調和器:檢查點的要求還沒有能夠被處理。這類情況通常產生于:某task
已處于RUNNING
狀態,但在內部可能還沒有準備好履行檢查點。
它除AbstractCheckpointMessage
需要的3個屬性外,還需要用于關聯檢查點的timestamp
。
位于Task
類的triggerCheckpointBarrier
方法中:
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
if (!success) {
DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
jobManager.tell(decline);
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new RuntimeException(
"Error while triggering checkpoint for " + taskName,
t));
}
}
}
};
位于JobManager
的handleCheckpointMessage
中
具體的實現在CheckpointCoordinator
的receiveDeclineMessage
中:
首先從接收的消息中(DeclineCheckpoint
)取得檢查點編號:
final long checkpointId = message.getCheckpointId();
接下來的邏輯是判斷當前檢查點是不是是未完成的檢查點:isPendingCheckpoint
接下來分為3種情況對待:
discarded
)isPendingCheckpoint = true;
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
rememberRecentCheckpointId(checkpointId);
置isPendingCheckpoint
為true
,根據檢查點編號,將檢查點從未完成的檢查點集合中移除,discard
檢查點,記住最近的檢查點(將其保持到到1個最近的檢查點列表中)。
接下來查找是不是還有待處理的檢查點,根據檢查點時間戳來判斷:
boolean haveMoreRecentPending = false;
Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
while (entries.hasNext()) {
PendingCheckpoint p = entries.next().getValue();
if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
haveMoreRecentPending = true;
break;
}
}
根據標識haveMoreRecentPending
來進入不同的處理邏輯:
if (!haveMoreRecentPending && !triggerRequestQueued) {
LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
triggerCheckpoint(System.currentTimeMillis());
} else if (!haveMoreRecentPending) {
LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
triggerQueuedRequests();
}
如果有需要處理的檢查點,并且當前能立即處理,則立即觸發檢查點定時任務;如果有需要處理的檢查點,但不能立即處理,則觸發入隊的定時任務。
discarded
拋出IllegalStateException
異常
如果在最近未完成的檢查點列表中找到,則有可能表示消息來遲了,將isPendingCheckpoint
置為true
,否則將isPendingCheckpoint
置為false
.
最后返回isPendingCheckpoint
。
該消息是1個應對信號,表示某個獨立的task
的檢查點已完成。也是由TaskManager
發送給JobManager
。該消息會攜帶task
的狀態:
RuntimeEnvironment
類的acknowledgeCheckpoint
方法。
具體的實現在CheckpointCoordinator
的receiveAcknowledgeMessage
中,開始的實現同receiveDeclineMessage
,也是判斷當前接收到的消息中包括的檢查點是不是是待處理的檢查點。如果是,并且也沒有discard
掉,則履行以下邏輯:
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) {
if (checkpoint.isFullyAcknowledged()) {
completed = checkpoint.toCompletedCheckpoint();
completedCheckpointStore.addCheckpoint(completed);
LOG.info("Completed checkpoint " + checkpointId + " (in " +
completed.getDuration() + " ms)");
LOG.debug(completed.getStates().toString());
pendingCheckpoints.remove(checkpointId);
rememberRecentCheckpointId(checkpointId);
dropSubsumedCheckpoints(completed.getTimestamp());
onFullyAcknowledgedCheckpoint(completed);
triggerQueuedRequests();
}
}
檢查點首先應對相干的task
,如果檢查點已完全應對完成,則將檢查點轉換成CompletedCheckpoint
,然后將其加入completedCheckpointStore
列表,并從pendingCheckpoints
中移除。然后調用dropSubsumedCheckpoints
它會從pendingCheckpoints
中diacard
所有時間戳小于當前檢查點的時間戳,并從集合中移除。
最后,如果該檢查點被轉化為已完成的檢查點,則:
if (completed != null) {
final long timestamp = completed.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
}
}
statsTracker.onCompletedCheckpoint(completed);
}
迭代所有待commit的task
,發送NotifyCheckpointComplete
消息。同時觸發狀態跟蹤器的onCompletedCheckpoint
回調方法。
該消息由JobManager
發送給TaskManager
,用于告知1個task
它的檢查點已得到完成確認,task
可以向第3方提交該檢查點。
位于CheckpointCoordinator
類的receiveAcknowledgeMessage
方法中,當檢查點acktask
完成,轉化為CompletedCheckpoint
以后
if (completed != null) {
final long timestamp = completed.getTimestamp();
for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId());
}
}
statsTracker.onCompletedCheckpoint(completed);
}
TaskManager
的handleCheckpointingMessage
實現:
case message: NotifyCheckpointComplete =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp
log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.notifyCheckpointComplete(checkpointId)
} else {
log.debug(
s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
}
主要是觸發task
的notifyCheckpointComplete
方法。
這篇文章主要講授了檢查點的基于定時任務的周期性的觸發機制,和基于Akka的Actor
模型的消息驅動的協同處理機制。
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交換群(123414680)
上一篇 聯合訓練圖論場
下一篇 這是一個盜版和強盜的社會