Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2977,6 +2977,11 @@ const (
ShardDistributorAssignLoopAttempts
ShardDistributorAssignLoopSuccess
ShardDistributorAssignLoopFail
ShardDistributorAssignLoopGetStateLatency
ShardDistributorAssignLoopCalculateLatency
ShardDistributorAssignLoopWriteLatency
ShardDistributorAssignLoopNumExecutors
ShardDistributorAssignLoopNumShards

ShardDistributorActiveShards

Expand All @@ -2985,6 +2990,11 @@ const (
ShardDistributorStoreRequestsPerNamespace
ShardDistributorStoreLatencyHistogramPerNamespace

// GetState internal breakdown metrics
ShardDistributorGetStateEtcdFetchLatency
ShardDistributorGetStateDeserializeLatency
ShardDistributorGetStateNumKeys

// ShardDistributorShardAssignmentDistributionLatency measures the time taken between assignment of a shard
// and the time it is fully distributed to executors
ShardDistributorShardAssignmentDistributionLatency
Expand Down Expand Up @@ -3776,13 +3786,21 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
ShardDistributorAssignLoopAttempts: {metricName: "shard_distrubutor_shard_assign_attempt", metricType: Counter},
ShardDistributorAssignLoopSuccess: {metricName: "shard_distrubutor_shard_assign_success", metricType: Counter},
ShardDistributorAssignLoopFail: {metricName: "shard_distrubutor_shard_assign_fail", metricType: Counter},
ShardDistributorAssignLoopGetStateLatency: {metricName: "shard_distributor_assign_loop_get_state_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorAssignLoopCalculateLatency: {metricName: "shard_distributor_assign_loop_calculate_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorAssignLoopWriteLatency: {metricName: "shard_distributor_assign_loop_write_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorAssignLoopNumExecutors: {metricName: "shard_distributor_assign_loop_num_executors", metricType: Gauge},
ShardDistributorAssignLoopNumShards: {metricName: "shard_distributor_assign_loop_num_shards", metricType: Gauge},

ShardDistributorActiveShards: {metricName: "shard_distributor_active_shards", metricType: Gauge},

ShardDistributorStoreExecutorNotFound: {metricName: "shard_distributor_store_executor_not_found", metricType: Counter},
ShardDistributorStoreFailuresPerNamespace: {metricName: "shard_distributor_store_failures_per_namespace", metricType: Counter},
ShardDistributorStoreRequestsPerNamespace: {metricName: "shard_distributor_store_requests_per_namespace", metricType: Counter},
ShardDistributorStoreLatencyHistogramPerNamespace: {metricName: "shard_distributor_store_latency_histogram_per_namespace", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorGetStateEtcdFetchLatency: {metricName: "shard_distributor_get_state_etcd_fetch_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorGetStateDeserializeLatency: {metricName: "shard_distributor_get_state_deserialize_latency", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets},
ShardDistributorGetStateNumKeys: {metricName: "shard_distributor_get_state_num_keys", metricType: Gauge},

ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: ShardDistributorShardAssignmentLatencyBuckets},
Expand Down
42 changes: 41 additions & 1 deletion service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) {
}

func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoopScope metrics.Scope) (err error) {
// Phase 1: Get state from store
getStateStart := p.timeSource.Now()
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopGetStateLatency, p.timeSource.Now().Sub(getStateStart))
if err != nil {
return fmt.Errorf("get state: %w", err)
}
Expand All @@ -365,6 +368,9 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
}
p.lastAppliedRevision = namespaceState.GlobalRevision

// Phase 2: Calculate new assignments
calculateStart := p.timeSource.Now()

// Identify stale executors that need to be removed
staleExecutors := p.identifyStaleExecutors(namespaceState)
if len(staleExecutors) > 0 {
Expand All @@ -374,6 +380,18 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
activeExecutors := p.getActiveExecutors(namespaceState, staleExecutors)
if len(activeExecutors) == 0 {
p.logger.Info("No active executors found. Cannot assign shards.")
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopCalculateLatency, p.timeSource.Now().Sub(calculateStart))

// Still cleanup stale executors even if no active executors remain
if len(staleExecutors) > 0 {
p.logger.Info("Cleaning up stale executors (no active executors)", tag.ShardExecutors(slices.Collect(maps.Keys(staleExecutors))))
// Use a longer timeout for deletions since they can be slow with large databases
deleteCtx, deleteCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer deleteCancel()
if err := p.shardStore.DeleteExecutors(deleteCtx, p.namespaceCfg.Name, slices.Collect(maps.Keys(staleExecutors)), p.election.Guard()); err != nil {
p.logger.Error("Failed to delete stale executors", tag.Error(err))
}
}
return nil
}
p.logger.Info("Active executors", tag.ShardExecutors(activeExecutors))
Expand All @@ -382,6 +400,14 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
shardsToReassign, currentAssignments := p.findShardsToReassign(activeExecutors, namespaceState, deletedShards, staleExecutors)

metricsLoopScope.UpdateGauge(metrics.ShardDistributorAssignLoopNumRebalancedShards, float64(len(shardsToReassign)))
metricsLoopScope.UpdateGauge(metrics.ShardDistributorAssignLoopNumExecutors, float64(len(activeExecutors)))

// Count total shards across all executors
totalShards := 0
for _, shards := range currentAssignments {
totalShards += len(shards)
}
metricsLoopScope.UpdateGauge(metrics.ShardDistributorAssignLoopNumShards, float64(totalShards))

// If there are deleted shards or stale executors, the distribution has changed.
assignedToEmptyExecutors := assignShardsToEmptyExecutors(currentAssignments)
Expand All @@ -391,24 +417,38 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
distributionChanged := len(deletedShards) > 0 || len(staleExecutors) > 0 || assignedToEmptyExecutors || updatedAssignments || isRebalancedByShardLoad
if !distributionChanged {
p.logger.Info("No changes to distribution detected. Skipping rebalance.")
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopCalculateLatency, p.timeSource.Now().Sub(calculateStart))
return nil
}

newState := p.getNewAssignmentsState(namespaceState, currentAssignments)
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopCalculateLatency, p.timeSource.Now().Sub(calculateStart))

if p.sdConfig.GetMigrationMode(p.namespaceCfg.Name) != types.MigrationModeONBOARDED {
p.logger.Info("Running rebalancing in shadow mode", tag.Dynamic("old_assignments", namespaceState.ShardAssignments), tag.Dynamic("new_assignments", newState))
p.emitActiveShardMetric(namespaceState.ShardAssignments, metricsLoopScope)

// Even in shadow mode, cleanup stale executors to prevent data accumulation
if len(staleExecutors) > 0 {
p.logger.Info("Cleaning up stale executors in shadow mode", tag.ShardExecutors(slices.Collect(maps.Keys(staleExecutors))))
if err := p.shardStore.DeleteExecutors(ctx, p.namespaceCfg.Name, slices.Collect(maps.Keys(staleExecutors)), p.election.Guard()); err != nil {
p.logger.Error("Failed to delete stale executors in shadow mode", tag.Error(err))
// Don't return error - shadow mode should be resilient
}
}
return nil
}

namespaceState.ShardAssignments = newState
p.logger.Info("Applying new shard distribution.")

// Use the leader guard for the assign and delete operation.
// Phase 3: Write new assignments to store
writeStart := p.timeSource.Now()
err = p.shardStore.AssignShards(ctx, p.namespaceCfg.Name, store.AssignShardsRequest{
NewState: namespaceState,
ExecutorsToDelete: staleExecutors,
}, p.election.Guard())
metricsLoopScope.RecordHistogramDuration(metrics.ShardDistributorAssignLoopWriteLatency, p.timeSource.Now().Sub(writeStart))
if err != nil {
return fmt.Errorf("assign shards: %w", err)
}
Expand Down
78 changes: 58 additions & 20 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
Expand All @@ -29,13 +30,14 @@ var (
)

type executorStoreImpl struct {
client etcdclient.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
recordWriter *common.RecordWriter
cfg *config.Config
client etcdclient.Client
prefix string
logger log.Logger
shardCache *shardcache.ShardToExecutorCache
timeSource clock.TimeSource
recordWriter *common.RecordWriter
cfg *config.Config
metricsClient metrics.Client
}

// shardStatisticsUpdate holds the staged statistics for a shard so we can write them
Expand All @@ -49,12 +51,13 @@ type shardStatisticsUpdate struct {
type ExecutorStoreParams struct {
fx.In

Client etcdclient.Client `name:"executorstore"`
ETCDConfig ETCDConfig
Lifecycle fx.Lifecycle
Logger log.Logger
TimeSource clock.TimeSource
Config *config.Config
Client etcdclient.Client `name:"executorstore"`
ETCDConfig ETCDConfig
Lifecycle fx.Lifecycle
Logger log.Logger
TimeSource clock.TimeSource
Config *config.Config
MetricsClient metrics.Client
}

// NewStore creates a new etcd-backed store and provides it to the fx application.
Expand All @@ -72,13 +75,14 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
}

store := &executorStoreImpl{
client: p.Client,
prefix: p.ETCDConfig.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
recordWriter: recordWriter,
cfg: p.Config,
client: p.Client,
prefix: p.ETCDConfig.Prefix,
logger: p.Logger,
shardCache: shardCache,
timeSource: timeSource,
recordWriter: recordWriter,
cfg: p.Config,
metricsClient: p.MetricsClient,
}

p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
Expand Down Expand Up @@ -201,16 +205,26 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string,
// --- ShardStore Implementation ---

func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*store.NamespaceState, error) {
metricsScope := s.metricsClient.Scope(metrics.ShardDistributorGetShardOwnerScope, metrics.NamespaceTag(namespace))

heartbeatStates := make(map[string]store.HeartbeatState)
assignedStates := make(map[string]store.AssignedState)
shardStats := make(map[string]store.ShardStatistics)

// Phase 1: Fetch from etcd
etcdFetchStart := s.timeSource.Now()
executorPrefix := etcdkeys.BuildExecutorsPrefix(s.prefix, namespace)
resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
metricsScope.RecordHistogramDuration(metrics.ShardDistributorGetStateEtcdFetchLatency, s.timeSource.Now().Sub(etcdFetchStart))
if err != nil {
return nil, fmt.Errorf("get executor data: %w", err)
}

// Record number of keys fetched
metricsScope.UpdateGauge(metrics.ShardDistributorGetStateNumKeys, float64(len(resp.Kvs)))

// Phase 2: Deserialize all keys
deserializeStart := s.timeSource.Now()
for _, kv := range resp.Kvs {
key := string(kv.Key)
value := string(kv.Value)
Expand Down Expand Up @@ -258,6 +272,7 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
heartbeatStates[executorID] = heartbeat
assignedStates[executorID] = assigned
}
metricsScope.RecordHistogramDuration(metrics.ShardDistributorGetStateDeserializeLatency, s.timeSource.Now().Sub(deserializeStart))

return &store.NamespaceState{
Executors: heartbeatStates,
Expand Down Expand Up @@ -587,10 +602,33 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,

// DeleteExecutors deletes the given executors from the store. It does not delete the shards owned by the executors, this
// should be handled by the namespace processor loop as we want to reassign, not delete the shards.
// Deletions are batched to avoid exceeding etcd's transaction size limit (default 128 ops).
func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace string, executorIDs []string, guard store.GuardFunc) error {
if len(executorIDs) == 0 {
return nil
}

// etcd has a default limit of 128 operations per transaction.
// Batch deletions to stay under this limit.
const batchSize = 20

for i := 0; i < len(executorIDs); i += batchSize {
end := i + batchSize
if end > len(executorIDs) {
end = len(executorIDs)
}
batch := executorIDs[i:end]

if err := s.deleteExecutorsBatch(ctx, namespace, batch, guard); err != nil {
return fmt.Errorf("delete batch %d-%d: %w", i, end, err)
}
}

return nil
}

// deleteExecutorsBatch deletes a batch of executors in a single transaction.
func (s *executorStoreImpl) deleteExecutorsBatch(ctx context.Context, namespace string, executorIDs []string, guard store.GuardFunc) error {
var ops []clientv3.Op

for _, executorID := range executorIDs {
Expand Down
Loading