@@ -164,14 +164,13 @@ public void beforeRefresh() throws IOException {}
164
164
@ Override
165
165
public void afterRefresh (boolean didRefresh ) {
166
166
167
- if (didRefresh ) {
167
+ if (didRefresh || remoteDirectory . getSegmentsUploadedToRemoteStore (). isEmpty () ) {
168
168
updateLocalRefreshTimeAndSeqNo ();
169
- }
170
-
171
- try {
172
- indexShard .getThreadPool ().executor (ThreadPool .Names .REMOTE_REFRESH ).submit (() -> syncSegments (false )).get ();
173
- } catch (InterruptedException | ExecutionException e ) {
174
- logger .info ("Exception occurred while scheduling syncSegments" , e );
169
+ try {
170
+ indexShard .getThreadPool ().executor (ThreadPool .Names .REMOTE_REFRESH ).submit (() -> syncSegments (false )).get ();
171
+ } catch (InterruptedException | ExecutionException e ) {
172
+ logger .info ("Exception occurred while scheduling syncSegments" , e );
173
+ }
175
174
}
176
175
}
177
176
@@ -232,9 +231,7 @@ private synchronized void syncSegments(boolean isRetry) {
232
231
// Start metadata file upload
233
232
uploadMetadata (localSegmentsPostRefresh , segmentInfos );
234
233
clearStaleFilesFromLocalSegmentChecksumMap (localSegmentsPostRefresh );
235
- onSuccessfulSegmentsSync (refreshTimeMs , refreshSeqNo );
236
- indexShard .getEngine ().translogManager ().setMinSeqNoToKeep (lastRefreshedCheckpoint + 1 );
237
- checkpointPublisher .publish (indexShard , checkpoint );
234
+ onSuccessfulSegmentsSync (refreshTimeMs , refreshSeqNo , lastRefreshedCheckpoint , checkpoint );
238
235
// At this point since we have uploaded new segments, segment infos and segment metadata file,
239
236
// along with marking minSeqNoToKeep, upload has succeeded completely.
240
237
shouldRetry = false ;
@@ -278,7 +275,12 @@ private void beforeSegmentsSync(boolean isRetry) {
278
275
segmentTracker .incrementTotalUploadsStarted ();
279
276
}
280
277
281
- private void onSuccessfulSegmentsSync (long refreshTimeMs , long refreshSeqNo ) {
278
+ private void onSuccessfulSegmentsSync (
279
+ long refreshTimeMs ,
280
+ long refreshSeqNo ,
281
+ long lastRefreshedCheckpoint ,
282
+ ReplicationCheckpoint checkpoint
283
+ ) {
282
284
// Update latest uploaded segment files name in segment tracker
283
285
segmentTracker .setLatestUploadedFiles (latestFileNameSizeOnLocalMap .keySet ());
284
286
// Update the remote refresh time and refresh seq no
@@ -287,6 +289,10 @@ private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) {
287
289
resetBackOffDelayIterator ();
288
290
// Cancel the scheduled cancellable retry if possible and set it to null
289
291
cancelAndResetScheduledCancellableRetry ();
292
+ // Set the minimum sequence number for keeping translog
293
+ indexShard .getEngine ().translogManager ().setMinSeqNoToKeep (lastRefreshedCheckpoint + 1 );
294
+ // Publishing the new checkpoint which is used for remote store + segrep indexes
295
+ checkpointPublisher .publish (indexShard , checkpoint );
290
296
}
291
297
292
298
/**
0 commit comments