Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

/** RPC Gateway interface for messages to the CheckpointCoordinator. */
public interface CheckpointCoordinatorGateway extends RpcGateway {

void acknowledgeCheckpoint(
CompletableFuture<Acknowledge> acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public CompletableFuture<Acknowledge> disconnectTaskManager(

// TODO: This method needs a leader session ID
@Override
public void acknowledgeCheckpoint(
public CompletableFuture<Acknowledge> acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
Expand All @@ -632,6 +632,7 @@ public void acknowledgeCheckpoint(
checkpointMetrics,
deserializeTaskStateSnapshot(checkpointState, getClass().getClassLoader()));
}
return CompletableFuture.completedFuture(Acknowledge.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public void acknowledgeCheckpoint(
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState) {
checkpointCoordinatorGateway.acknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
serializeTaskStateSnapshot(subtaskState));
checkpointCoordinatorGateway
.acknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
serializeTaskStateSnapshot(subtaskState))
.join();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,20 @@ public void run() {
if (asyncCheckpointState.compareAndSet(
AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {

reportCompletedSnapshotStates(
snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates,
snapshotsFinalizeResult.localTaskOperatorSubtaskStates,
asyncDurationMillis);
try {
reportCompletedSnapshotStates(
snapshotsFinalizeResult.jobManagerTaskOperatorSubtaskStates,
snapshotsFinalizeResult.localTaskOperatorSubtaskStates,
asyncDurationMillis);
} catch (Exception reportFailure) {
// Upload already succeeded; JM is authoritative for state cleanup.
// Running cleanup() here could delete files referenced by a checkpoint
// the JM has already completed (ACK applied but response RPC failed).
// Decline so JM aborts the still-pending checkpoint quickly; if the ACK
// was actually applied, the decline arrives after the checkpoint moved
// to COMPLETED and is silently dropped by the coordinator.
declineWithoutDiscard(reportFailure);
}

} else {
LOG.debug(
Expand Down Expand Up @@ -275,6 +285,31 @@ private void reportAbortedSnapshotStats(long stateSize, long checkpointedSize) {
.reportIncompleteTaskStateSnapshots(checkpointMetaData, metrics);
}

private void declineWithoutDiscard(Exception reportFailure) {
long checkpointId = checkpointMetaData.getCheckpointId();
LOG.warn(
"{} - failed to report completed checkpoint {} to JobManager. "
+ "Declining without discarding uploaded state.",
taskName,
checkpointId,
reportFailure);
if (!isTaskRunning.get()) {
return;
}
try {
taskEnvironment.declineCheckpoint(
checkpointId,
new CheckpointException(
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION, reportFailure));
} catch (Exception declineFailure) {
LOG.warn(
"{} - failed to decline checkpoint {} after report failure.",
taskName,
checkpointId,
declineFailure);
}
}

private void handleExecutionException(Exception e) {

boolean didCleanup = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public void notifyNotEnoughResourcesAvailable(
}

@Override
public void acknowledgeCheckpoint(
public CompletableFuture<Acknowledge> acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Expand All @@ -492,6 +492,7 @@ public void acknowledgeCheckpoint(
checkpointId,
checkpointMetrics,
deserializeTaskStateSnapshot(subtaskState, getClass().getClassLoader())));
return CompletableFuture.completedFuture(Acknowledge.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -125,6 +127,73 @@ void testDeclineAsyncCheckpoint() {
assertThat(environment.getCause().getCheckpointFailureReason()).isSameAs(originalReason);
}

@Test
void testReportFailureDeclinesWithoutDiscardingState() {
testCheckpointAckFailureHandling(true);
}

@Test
void testReportFailureDoesNotDeclineWhenTaskNotRunning() {
testCheckpointAckFailureHandling(false);
}

/**
* Simulates the upload succeeding but the ACK RPC failing (e.g. AskTimeoutException either
* because the JM never received the ACK, or because it applied the ACK and the response timed
* out). The TM must:
*
* <ol>
* <li>NOT run cleanup() — files may be referenced by a completed checkpoint.
* <li>Send declineCheckpoint (if the task is running) — fails the checkpoint fast in the
* still-pending case; silently dropped by the coordinator if the ACK was applied.
* </ol>
*/
private void testCheckpointAckFailureHandling(boolean isTaskRunning) {
RuntimeException reportFailure = new RuntimeException("simulated ACK response timeout");

CancelTrackingOperatorSnapshotFutures snapshot =
new CancelTrackingOperatorSnapshotFutures();
Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress = new HashMap<>();
snapshotsInProgress.put(new OperatorID(), snapshot);

TestTaskStateManager throwingStateManager =
new TestTaskStateManager() {
@Override
public void reportTaskStateSnapshots(
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot acknowledgedState,
TaskStateSnapshot localState) {
throw reportFailure;
}
};

TestEnvironment env =
new TestEnvironment(
new Configuration(),
new Configuration(),
new ExecutionConfig(),
1L,
new MockInputSplitProvider(),
1,
throwingStateManager);

AsyncCheckpointRunnable runnable =
createAsyncRunnable(snapshotsInProgress, env, false, isTaskRunning);
runnable.run();

assertThat(snapshot.cancelCount).isZero();
assertThat(runnable.getFinishedFuture()).isCompleted();
if (isTaskRunning) {
assertThat(env.getCause()).isNotNull();
assertThat(env.getCause().getCheckpointFailureReason())
.isSameAs(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
assertThat(env.getCause()).hasRootCause(reportFailure);
} else {
assertThat(env.getCause()).isNull();
}
}

@Test
void testReportFinishedOnRestoreTaskSnapshots() {
TestEnvironment environment = new TestEnvironment();
Expand Down Expand Up @@ -206,4 +275,14 @@ CheckpointException getCause() {
return cause;
}
}

private static class CancelTrackingOperatorSnapshotFutures extends OperatorSnapshotFutures {
int cancelCount = 0;

@Override
public Tuple2<Long, Long> cancel() throws Exception {
cancelCount++;
return super.cancel();
}
}
}
Loading