Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -566,28 +566,28 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
})
);

// The SnapshotShutdownProgressTracker periodically logs the status of in-flight snapshots during node shutdown.
// The periodic logger ends when the last snapshot is either finished or paused. Since this is asynchronous, to avoid race
// conditions we must define all of our mocklog assertions up front to ensure that the tracker is acting as expected

// Check that the SnapshotShutdownProgressTracker observed the request sent to the master node.
// Note that we cannot guarantee how many requests the tracker receives before finishing its logging
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker shard snapshot has paused log message",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"*Number shard snapshots waiting for master node reply to status update request [" + numShards + "]*"
"*Number shard snapshots waiting for master node reply to status update request [*]*"
)
);

// Let the shard snapshot proceed. It will still get stuck waiting for the master node to respond.
unblockNode(repoName, nodeForRemoval);

// Check that the SnapshotShutdownProgressTracker observed the request sent to the master node.
mockLog.awaitAllExpectationsMatched();
resetMockLog();

// Check that the SnapshotShutdownProgressTracker observed the shard snapshot finishing as paused.
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker shard snapshot has paused log message",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
"*on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
)
);
mockLog.addExpectation(
Expand All @@ -599,13 +599,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
)
);

// Let the shard snapshot proceed. It will still get stuck waiting for the master node to respond.
unblockNode(repoName, nodeForRemoval);

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();

// Wait for the snapshot to fully pause.
safeAwait(snapshotPausedListener);

// Check that the SnapshotShutdownProgressTracker observed the shard snapshot finishing as paused.
mockLog.awaitAllExpectationsMatched();
resetMockLog();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
import java.util.function.Supplier;

/**
* Tracks progress of shard snapshots during shutdown, on this single data node. Periodically reports progress via logging, the interval for
* which see {@link #SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING}.
* Tracks progress of shard snapshots during shutdown, on this single data node.
* Always tracks the progress of snapshots, but only logs the period progress report once node shutdown starts.
* See {@link #SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING} for the logging interval.
*/
public class SnapshotShutdownProgressTracker {

Expand Down Expand Up @@ -68,6 +69,7 @@ public class SnapshotShutdownProgressTracker {

/**
* Tracks the number of shard snapshots that have started on the data node but not yet finished.
* If the node starts shutting down, when this reaches 0, we stop logging the periodic progress report
*/
private final AtomicLong numberOfShardSnapshotsInProgressOnDataNode = new AtomicLong();

Expand All @@ -87,6 +89,11 @@ public class SnapshotShutdownProgressTracker {
private final AtomicLong abortedCount = new AtomicLong();
private final AtomicLong pausedCount = new AtomicLong();

/**
* Ensure that we only log the exit message once
*/
private volatile boolean exitLogMessageCompleted = false;

public SnapshotShutdownProgressTracker(
Supplier<String> localNodeIdSupplier,
Consumer<Logger> logShardStatuses,
Expand Down Expand Up @@ -135,27 +142,51 @@ private void cancelProgressLogger() {
* Logs information about shard snapshot progress.
*/
private void logProgressReport() {
logger.info(
"""
Current active shard snapshot stats on data node [{}]. \
Node shutdown cluster state update received at [{} UTC]. \
Finished signalling shard snapshots to pause at [{} UTC]. \
Time between the node shutdown cluster state update and signalling shard snapshots to pause is [{} millis]. \
Number shard snapshots running [{}]. \
Number shard snapshots waiting for master node reply to status update request [{}] \
Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\
""",
getLocalNodeId.get(),
DATE_TIME_FORMATTER.formatMillis(shutdownStartMillis),
DATE_TIME_FORMATTER.formatMillis(shutdownFinishedSignallingPausingMillis),
shutdownFinishedSignallingPausingMillis - shutdownStartMillis,
numberOfShardSnapshotsInProgressOnDataNode.get(),
shardSnapshotRequests.size(),
doneCount.get(),
failureCount.get(),
abortedCount.get(),
pausedCount.get()
);
assert exitLogMessageCompleted == false : "We should not log after the exit message";
// If there are no more snapshots in progress, then stop logging periodic progress reports
if (numberOfShardSnapshotsInProgressOnDataNode.get() == 0 && shardSnapshotRequests.isEmpty()) {
logger.info(
"""
All shard snapshots have finished or been paused on data node [{}].\
Node shutdown cluster state update received at [{} UTC]. \
Progress logging completed at [{} UTC]. \
Number shard snapshots waiting for master node reply to status update request [{}] \
Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\
""",
getLocalNodeId.get(),
DATE_TIME_FORMATTER.formatMillis(shutdownStartMillis),
DATE_TIME_FORMATTER.formatMillis(threadPool.relativeTimeInMillis()),
shardSnapshotRequests.size(),
doneCount.get(),
failureCount.get(),
abortedCount.get(),
pausedCount.get()
);
cancelProgressLogger();
this.exitLogMessageCompleted = true;
} else {
logger.info(
"""
Current active shard snapshot stats on data node [{}]. \
Node shutdown cluster state update received at [{} UTC]. \
Finished signalling shard snapshots to pause at [{} UTC]. \
Time between the node shutdown cluster state update and signalling shard snapshots to pause is [{} millis]. \
Number shard snapshots running [{}]. \
Number shard snapshots waiting for master node reply to status update request [{}] \
Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\
""",
getLocalNodeId.get(),
DATE_TIME_FORMATTER.formatMillis(shutdownStartMillis),
DATE_TIME_FORMATTER.formatMillis(shutdownFinishedSignallingPausingMillis),
shutdownFinishedSignallingPausingMillis - shutdownStartMillis,
numberOfShardSnapshotsInProgressOnDataNode.get(),
shardSnapshotRequests.size(),
doneCount.get(),
failureCount.get(),
abortedCount.get(),
pausedCount.get()
);
}
// Use a callback to log the shard snapshot details.
logIndexShardSnapshotStatuses.accept(logger);
}
Expand Down Expand Up @@ -204,6 +235,7 @@ public void onClusterStateRemoveShutdown() {

// Turn off the progress logger, which we only want to run during shutdown.
cancelProgressLogger();
this.exitLogMessageCompleted = false;
}

/**
Expand Down
Loading