Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,28 +566,28 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
})
);

// The SnapshotShutdownProgressTracker periodically logs the status of in-flight snapshots during node shutdown.
// The periodic logger ends when the last snapshot is either finished or paused. Since this is asynchronous, to avoid race
// conditions we must define all of our mocklog assertions up front to ensure that the tracker is acting as expected

// Check that the SnapshotShutdownProgressTracker observed the request sent to the master node.
// Note that we cannot guarantee how many requests the tracker receives before finishing its logging
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker shard snapshot has paused log message",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"*Number shard snapshots waiting for master node reply to status update request [" + numShards + "]*"
"*Number shard snapshots waiting for master node reply to status update request [*]*"
)
);

// Let the shard snapshot proceed. It will still get stuck waiting for the master node to respond.
unblockNode(repoName, nodeForRemoval);

// Check that the SnapshotShutdownProgressTracker observed the request sent to the master node.
mockLog.awaitAllExpectationsMatched();
resetMockLog();

// Check that the SnapshotShutdownProgressTracker observed the shard snapshot finishing as paused.
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"SnapshotShutdownProgressTracker shard snapshot has paused log message",
SnapshotShutdownProgressTracker.class.getCanonicalName(),
Level.INFO,
"Current active shard snapshot stats on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
"*on data node [" + nodeForRemovalId + "]*Paused [" + numShards + "]"
)
);
mockLog.addExpectation(
Expand All @@ -599,13 +599,15 @@ public void testSnapshotShutdownProgressTracker() throws Exception {
)
);

// Let the shard snapshot proceed. It will still get stuck waiting for the master node to respond.
unblockNode(repoName, nodeForRemoval);

// Release the master node to respond
snapshotStatusUpdateLatch.countDown();

// Wait for the snapshot to fully pause.
safeAwait(snapshotPausedListener);

// Check that the SnapshotShutdownProgressTracker observed the shard snapshot finishing as paused.
mockLog.awaitAllExpectationsMatched();
resetMockLog();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import java.util.function.Supplier;

/**
* Tracks progress of shard snapshots during shutdown, on this single data node. Periodically reports progress via logging, the interval for
* which see {@link #SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING}.
* Tracks progress of shard snapshots during shutdown, on this single data node. Periodically reports progress via logging until
* all snapshots have completed or are paused, the interval for which see {@link #SNAPSHOT_PROGRESS_DURING_SHUTDOWN_LOG_INTERVAL_SETTING}.
* <P>
* Note that this class is used even when the node isn't shutting down. When {@link SnapshotShardsService} starts a new snapshot task,
* the {@link SnapshotShutdownProgressTracker} is updated, so that if the node shuts down while the task is executing, we have an accurate
* counter for in-progress snapshots. This counter is decremented when the snapshot task finishes, either successfully or not.
*/
public class SnapshotShutdownProgressTracker {

Expand Down Expand Up @@ -66,6 +70,7 @@ public class SnapshotShutdownProgressTracker {

/**
* Tracks the number of shard snapshots that have started on the data node but not yet finished.
* If the node starts shutting down, when this reaches 0, we stop logging the periodic progress report
*/
private final AtomicLong numberOfShardSnapshotsInProgressOnDataNode = new AtomicLong();

Expand Down Expand Up @@ -133,25 +138,47 @@ private void cancelProgressLogger() {
* Logs information about shard snapshot progress.
*/
private void logProgressReport() {
logger.info(
"""
Current active shard snapshot stats on data node [{}]. \
Node shutdown cluster state update received at [{} millis]. \
Finished signalling shard snapshots to pause at [{} millis]. \
Number shard snapshots running [{}]. \
Number shard snapshots waiting for master node reply to status update request [{}] \
Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\
""",
getLocalNodeId.get(),
shutdownStartMillis,
shutdownFinishedSignallingPausingMillis,
numberOfShardSnapshotsInProgressOnDataNode.get(),
shardSnapshotRequests.size(),
doneCount.get(),
failureCount.get(),
abortedCount.get(),
pausedCount.get()
);
// If there are no more snapshots in progress, then stop logging periodic progress reports
if (numberOfShardSnapshotsInProgressOnDataNode.get() == 0) {
logger.info(
"""
All shard snapshots have finished or been paused on data node [{}].\
Node shutdown cluster state update received at [{} millis]. \
Progress logging completed at [{} millis]. \
Number shard snapshots waiting for master node reply to status update request [{}] \
Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\
""",
getLocalNodeId.get(),
shutdownStartMillis,
threadPool.relativeTimeInMillis(),
shardSnapshotRequests.size(),
doneCount.get(),
failureCount.get(),
abortedCount.get(),
pausedCount.get()
);
cancelProgressLogger();
} else {
logger.info(
"""
Current active shard snapshot stats on data node [{}]. \
Node shutdown cluster state update received at [{} millis]. \
Finished signalling shard snapshots to pause at [{} millis]. \
Number shard snapshots running [{}]. \
Number shard snapshots waiting for master node reply to status update request [{}] \
Shard snapshot completion stats since shutdown began: Done [{}]; Failed [{}]; Aborted [{}]; Paused [{}]\
""",
getLocalNodeId.get(),
shutdownStartMillis,
shutdownFinishedSignallingPausingMillis,
numberOfShardSnapshotsInProgressOnDataNode.get(),
shardSnapshotRequests.size(),
doneCount.get(),
failureCount.get(),
abortedCount.get(),
pausedCount.get()
);
}
// Use a callback to log the shard snapshot details.
logIndexShardSnapshotStatuses.accept(logger);
}
Expand Down
Loading