Skip to content

Commit

Permalink
[#25480] xCluster: Keep running xcluster task even if leader temporar…
Browse files Browse the repository at this point in the history
…ily loses the lease

Summary:
On an xCluster target universe the yb-master leader computes the xCluster safe time every second. This task stops if the leadership is lost, and restarts again when a new leader is elected and sys catalog is reloaded. In some rare cases the master can lose the raft lease, but still retain its leadership. In this case we should keep running the xCluster task until we are certain that the leadership is lost.

`SCOPED_LEADER_SHARED_LOCK` checks if we are the leader and lease is valid. This change uses `CatalogManager::CheckIsLeaderAndReady` to check if the leadership is still valid.

Removing unnecessary `leader_term_` member.
Jira: DB-14730

Test Plan:
Jenkins

./yb_build.sh asan --cxx-test integration-tests_xcluster_ysql-test --gtest_filter XClusterYSqlTestConsistentTransactionsTest.AddServerIntraTransaction -n 40  --tp 1

Reviewers: sergei, jhe, slingam, xCluster

Reviewed By: sergei, jhe

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D40988
  • Loading branch information
hari90 committed Jan 4, 2025
1 parent 91c4db2 commit 91a5d51
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 25 deletions.
5 changes: 0 additions & 5 deletions src/yb/master/xcluster/xcluster_safe_time_service-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,6 @@ TEST_F(XClusterSafeTimeServiceTest, ComputeSafeTimeWithFilters) {
ASSERT_EQ(safe_time_service.safe_time_map_[db2], ht_invalid);
ASSERT_EQ(safe_time_service.entries_to_delete_.size(), 0);

ASSERT_NOK(safe_time_service.GetXClusterSafeTimeForNamespace(
dummy_leader_term + 1, db1, XClusterSafeTimeFilter::NONE));
ASSERT_NOK(safe_time_service.GetXClusterSafeTimeForNamespace(
dummy_leader_term - 1, db1, XClusterSafeTimeFilter::NONE));

auto db1_none = ASSERT_RESULT(GetXClusterSafeTimeWithNoFilter(safe_time_service, db1));
auto db1_ddlqueue = ASSERT_RESULT(GetXClusterSafeTimeFilterOutDdlQueue(safe_time_service, db1));
auto db2_none = ASSERT_RESULT(GetXClusterSafeTimeWithNoFilter(safe_time_service, db2));
Expand Down
45 changes: 26 additions & 19 deletions src/yb/master/xcluster/xcluster_safe_time_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,35 @@ void XClusterSafeTimeService::ProcessTaskPeriodically() {
}

auto leader_term_result = GetLeaderTermFromCatalogManager();
if (!leader_term_result.ok()) {
VLOG_WITH_FUNC(1) << "Going into idle mode due to master leader change";
EnterIdleMode("master leader change");
return;
}
int64_t leader_term = leader_term_result.get();
if (leader_term_result.ok()) {
// Compute safe time now and also update the metrics.
bool further_computation_needed = true;
auto result = ComputeSafeTime(*leader_term_result, /* update_metrics */ true);
if (result.ok()) {
further_computation_needed = result.get();
} else {
LOG(WARNING) << "Failure in XClusterSafeTime task: " << result;
}

// Compute safe time now and also update the metrics.
bool further_computation_needed = true;
auto result = ComputeSafeTime(leader_term, /* update_metrics */ true);
if (result.ok()) {
further_computation_needed = result.get();
if (!further_computation_needed) {
VLOG_WITH_FUNC(1) << "Going into idle mode due to lack of work";
EnterIdleMode("no more work left");
return;
}
} else {
LOG(WARNING) << "Failure in XClusterSafeTime task: " << result;
}
// We can fail to get the term due to transient errors like a stale lease. In these cases we can
// recover without losing the leadership or changing the term. So check again to see if we are
// the leader.
auto leader_status = catalog_manager_->CheckIsLeaderAndReady();
if (!leader_status.ok()) {
LOG_WITH_FUNC(INFO) << "Going into idle mode due to master leader change: " << leader_status;
EnterIdleMode("master leader change");
return;
}

if (!further_computation_needed) {
VLOG_WITH_FUNC(1) << "Going into idle mode due to lack of work";
EnterIdleMode("no more work left");
return;
YB_LOG_EVERY_N_SECS_OR_VLOG(INFO, 60, 1)
<< "Skip xCluster safe time computation since this is not a healthy master leader: "
<< leader_term_result.status();
}

// Delay before before running the task again.
Expand Down Expand Up @@ -230,7 +239,6 @@ Result<HybridTime> XClusterSafeTimeService::GetXClusterSafeTimeForNamespace(
const XClusterSafeTimeFilter& filter) {
SharedLock lock(mutex_);
SCHECK(safe_time_table_ready_, IllegalState, "Safe time table is not ready yet.");
SCHECK_EQ(leader_term_, leader_term, IllegalState, "Received unexpected leader term");

const XClusterNamespaceToSafeTimeMap& safe_time_map =
VERIFY_RESULT(GetFilteredXClusterSafeTimeMap(filter));
Expand Down Expand Up @@ -478,7 +486,6 @@ Result<bool> XClusterSafeTimeService::ComputeSafeTime(
// and setting the new config. Its important to make sure that the config we persist is accurate
// as only that protects the safe time from going backwards.
RETURN_NOT_OK(SetXClusterSafeTime(leader_term, namespace_safe_time_map));
leader_term_ = leader_term;

if (update_metrics) {
// Update the metrics using the newly computed maps.
Expand Down
1 change: 0 additions & 1 deletion src/yb/master/xcluster/xcluster_safe_time_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class XClusterSafeTimeService {

std::unique_ptr<client::TableHandle> safe_time_table_;

int64_t leader_term_ GUARDED_BY(mutex_);
int32_t cluster_config_version_ GUARDED_BY(mutex_);
std::map<xcluster::SafeTimeTablePK, NamespaceId> producer_tablet_namespace_map_
GUARDED_BY(mutex_);
Expand Down

0 comments on commit 91a5d51

Please sign in to comment.