Skip to content

Commit

Permalink
Fix leader partition time out issue due to exception (#3666)
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba authored Nov 15, 2023
1 parent 43a0c75 commit e848f63
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,16 @@ public void run() {

// Step 3: Find and create children partitions.
compareAndCreateChildrenPartitions(sourcePartitions);

// Extend the timeout
// will always be a leader until shutdown
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
}

}

} catch (Exception e) {
LOG.error("Exception occurred in primary scheduling loop", e);
} finally {
// Extend the timeout
// will always be a leader until shutdown
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
Expand Down Expand Up @@ -178,7 +177,6 @@ private void init() {
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
leaderProgressState.setStreamArns(streamArns);
leaderProgressState.setInitialized(true);
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
}


Expand Down Expand Up @@ -212,7 +210,7 @@ private void compareAndCreateChildrenPartitions(List<EnhancedSourcePartition> so
}
});
long endTime = System.currentTimeMillis();
LOG.info("Compare and create children partitions took {} milliseconds", endTime - startTime);
LOG.debug("Compare and create children partitions took {} milliseconds", endTime - startTime);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private List<Shard> listShards(String streamArn, String lastEvaluatedShardId) {
} while (lastEvaluatedShardId != null);

long endTime = System.currentTimeMillis();
LOG.info("Listing shards (DescribeStream call) took {} milliseconds with {} shards found, ", endTime - startTime, shards.size());
LOG.info("Listing shards (DescribeStream call) took {} milliseconds with {} shards found", endTime - startTime, shards.size());
return shards;
}

Expand Down

0 comments on commit e848f63

Please sign in to comment.