Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d1e858f
feat(shard distributor): add shard key helpers and metrics state
AndreasHolt Oct 19, 2025
5f105bf
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
d32cc72
fix(shard distributor): update LastMoveTime in the case where a shard…
AndreasHolt Oct 19, 2025
9e958d9
test(shard distributor): add tests for shard metrics
AndreasHolt Oct 19, 2025
3be24c4
fix(shard distributor): modify comment
AndreasHolt Oct 19, 2025
1ca89cd
fix(shard distributor): add atomic check to prevent metrics race
AndreasHolt Oct 19, 2025
9a48ab6
fix(shard distributor): apply shard metric updates in a second phase …
AndreasHolt Oct 19, 2025
876472d
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
023fc73
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
3b3b8db
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
81143d8
test(shard distributor): BuildShardPrefix, BuildShardKey, ParseShardKey
AndreasHolt Oct 22, 2025
3035006
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
266d00e
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
c5dee7f
test(shard distributor): small changes to shard key tests s.t. they l…
AndreasHolt Oct 25, 2025
b333e7d
fix(shard distributor): no longer check for key type ShardStatisticsK…
AndreasHolt Oct 25, 2025
ac4b237
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
24888ac
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
63d060b
feat(shard distributor): clean up the shard statistics
AndreasHolt Oct 29, 2025
588a8f4
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
2c88990
fix(shard distributor): add mapping (new metric)
AndreasHolt Oct 29, 2025
2642080
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
3931fea
refactor(shard distributor): fetch namespace state once per cleanup tick
AndreasHolt Nov 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ const (
ShardDistributorStoreAssignShardScope
ShardDistributorStoreAssignShardsScope
ShardDistributorStoreDeleteExecutorsScope
ShardDistributorStoreDeleteShardStatsScope
ShardDistributorStoreGetHeartbeatScope
ShardDistributorStoreGetStateScope
ShardDistributorStoreRecordHeartbeatScope
Expand Down Expand Up @@ -2151,18 +2152,19 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
DiagnosticsWorkflowScope: {operation: "DiagnosticsWorkflow"},
},
ShardDistributor: {
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
ShardDistributorExecutorScope: {operation: "Executor"},
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
ShardDistributorStoreSubscribeScope: {operation: "StoreSubscribe"},
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
ShardDistributorExecutorScope: {operation: "Executor"},
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
ShardDistributorStoreSubscribeScope: {operation: "StoreSubscribe"},
},
}

Expand Down
82 changes: 77 additions & 5 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,21 @@ func (p *namespaceProcessor) runCleanupLoop(ctx context.Context) {
return
case <-ticker.Chan():
p.logger.Info("Periodic heartbeat cleanup triggered.")
p.cleanupStaleExecutors(ctx)
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
if err != nil {
p.logger.Error("Failed to get state for cleanup", tag.Error(err))
continue
}
p.cleanupStaleExecutors(ctx, namespaceState)
p.cleanupStaleShardStats(ctx, namespaceState)
}
}
}

// cleanupStaleExecutors removes executors who have not reported a heartbeat recently.
func (p *namespaceProcessor) cleanupStaleExecutors(ctx context.Context) {
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
if err != nil {
p.logger.Error("Failed to get state for heartbeat cleanup", tag.Error(err))
func (p *namespaceProcessor) cleanupStaleExecutors(ctx context.Context, namespaceState *store.NamespaceState) {
if namespaceState == nil {
p.logger.Error("Namespace state missing for heartbeat cleanup")
return
}

Expand All @@ -264,6 +269,73 @@ func (p *namespaceProcessor) cleanupStaleExecutors(ctx context.Context) {
}
}

func (p *namespaceProcessor) cleanupStaleShardStats(ctx context.Context, namespaceState *store.NamespaceState) {
if namespaceState == nil {
p.logger.Error("Namespace state missing for shard stats cleanup")
return
}

activeShards := make(map[string]struct{})
now := p.timeSource.Now().Unix()
shardStatsTTL := int64(p.cfg.HeartbeatTTL.Seconds())

// 1. build set of active executors

// add all assigned shards from executors that are ACTIVE and not stale
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the executor is in draining state? are we fine with losing the statics for that? it is covered from the following case where the shard is not in a done state right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ShardStatus != DONE check in cleanupStaleShardStats keeps shard stats alive while a draining executor still reports them, and the TTL "grace period" only removes them after the shard has been marked DONE and stayed idle for that whole TTL window.

for executorID, assignedState := range namespaceState.ShardAssignments {
executor, exists := namespaceState.Executors[executorID]
if !exists {
continue
}

isActive := executor.Status == types.ExecutorStatusACTIVE
isNotStale := (now - executor.LastHeartbeat) <= shardStatsTTL
if isActive && isNotStale {
for shardID := range assignedState.AssignedShards {
activeShards[shardID] = struct{}{}
}
}
}

// add all shards in ReportedShards where the status is not DONE
for _, heartbeatState := range namespaceState.Executors {
for shardID, shardStatusReport := range heartbeatState.ReportedShards {
if shardStatusReport.Status != types.ShardStatusDONE {
activeShards[shardID] = struct{}{}
}
}
}

// 2. build set of stale shard stats

// append all shard stats that are not in the active shards set
var staleShardStats []string
for shardID, stats := range namespaceState.ShardStats {
if _, ok := activeShards[shardID]; ok {
continue
}
recentUpdate := stats.LastUpdateTime > 0 && (now-stats.LastUpdateTime) <= shardStatsTTL
recentMove := stats.LastMoveTime > 0 && (now-stats.LastMoveTime) <= shardStatsTTL
if recentUpdate || recentMove {
// Preserve stats that have been updated recently to allow cooldown/load history to
// survive executor churn. These shards are likely awaiting reassignment,
// so we don't want to delete them.
continue
}
staleShardStats = append(staleShardStats, shardID)
}

if len(staleShardStats) == 0 {
return
}

p.logger.Info("Removing stale shard stats")
// Use the leader guard for the delete operation.
if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil {
p.logger.Error("Failed to delete stale shard stats", tag.Error(err))
}
}

// rebalanceShards is the core logic for distributing shards among active executors.
func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) {
metricsLoopScope := p.metricsClient.Scope(metrics.ShardDistributorAssignLoopScope)
Expand Down
82 changes: 74 additions & 8 deletions service/sharddistributor/leader/process/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,80 @@ func TestCleanupStaleExecutors(t *testing.T) {
"exec-stale": {LastHeartbeat: now.Add(-2 * time.Second).Unix()},
}

mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{Executors: heartbeats}, nil)
namespaceState := &store.NamespaceState{Executors: heartbeats}
mocks.election.EXPECT().Guard().Return(store.NopGuard())
mocks.store.EXPECT().DeleteExecutors(gomock.Any(), mocks.cfg.Name, []string{"exec-stale"}, gomock.Any()).Return(nil)

processor.cleanupStaleExecutors(context.Background())
processor.cleanupStaleExecutors(context.Background(), namespaceState)
}

func TestCleanupStaleShardStats(t *testing.T) {
t.Run("stale shard stats are deleted", func(t *testing.T) {
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
defer mocks.ctrl.Finish()
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)

now := mocks.timeSource.Now()

heartbeats := map[string]store.HeartbeatState{
"exec-active": {LastHeartbeat: now.Unix(), Status: types.ExecutorStatusACTIVE},
"exec-stale": {LastHeartbeat: now.Add(-2 * time.Second).Unix()},
}

assignments := map[string]store.AssignedState{
"exec-active": {
AssignedShards: map[string]*types.ShardAssignment{
"shard-1": {Status: types.AssignmentStatusREADY},
"shard-2": {Status: types.AssignmentStatusREADY},
},
},
"exec-stale": {
AssignedShards: map[string]*types.ShardAssignment{
"shard-3": {Status: types.AssignmentStatusREADY},
},
},
}

shardStats := map[string]store.ShardStatistics{
"shard-1": {SmoothedLoad: 1.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
"shard-2": {SmoothedLoad: 2.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
"shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Add(-2 * time.Second).Unix(), LastMoveTime: now.Add(-2 * time.Second).Unix()},
}

namespaceState := &store.NamespaceState{
Executors: heartbeats,
ShardAssignments: assignments,
ShardStats: shardStats,
}

mocks.election.EXPECT().Guard().Return(store.NopGuard())
mocks.store.EXPECT().DeleteShardStats(gomock.Any(), mocks.cfg.Name, []string{"shard-3"}, gomock.Any()).Return(nil)
processor.cleanupStaleShardStats(context.Background(), namespaceState)
})

t.Run("recent shard stats are preserved", func(t *testing.T) {
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
defer mocks.ctrl.Finish()
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)

now := mocks.timeSource.Now()

expiredExecutor := now.Add(-2 * time.Second).Unix()
namespaceState := &store.NamespaceState{
Executors: map[string]store.HeartbeatState{
"exec-stale": {LastHeartbeat: expiredExecutor},
},
ShardAssignments: map[string]store.AssignedState{},
ShardStats: map[string]store.ShardStatistics{
"shard-1": {SmoothedLoad: 5.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
},
}

processor.cleanupStaleShardStats(context.Background(), namespaceState)

// No delete expected since stats are recent.
})

}

func TestRebalance_StoreErrors(t *testing.T) {
Expand Down Expand Up @@ -213,16 +282,13 @@ func TestCleanup_StoreErrors(t *testing.T) {
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
expectedErr := errors.New("store is down")

mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(nil, expectedErr)
processor.cleanupStaleExecutors(context.Background())

mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{
namespaceState := &store.NamespaceState{
Executors: map[string]store.HeartbeatState{"stale": {LastHeartbeat: 0}},
GlobalRevision: 1,
}, nil)
}
mocks.election.EXPECT().Guard().Return(store.NopGuard())
mocks.store.EXPECT().DeleteExecutors(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).Return(expectedErr)
processor.cleanupStaleExecutors(context.Background())
processor.cleanupStaleExecutors(context.Background(), namespaceState)
}

func TestRunLoop_SubscriptionError(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
ExecutorReportedShardsKey = "reported_shards"
ExecutorAssignedStateKey = "assigned_state"
ShardAssignedKey = "assigned"
ShardStatisticsKey = "statistics"
ExecutorMetadataKey = "metadata"
)

Expand Down Expand Up @@ -70,6 +71,30 @@ func ParseExecutorKey(prefix string, namespace, key string) (executorID, keyType
return parts[0], parts[1], nil
}

func BuildShardPrefix(prefix string, namespace string) string {
Copy link
Contributor

@eleonoradgr eleonoradgr Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have tests for BuildShardPrefix, BuildShardKey and ParseShardKey :)

return fmt.Sprintf("%s/shards/", BuildNamespacePrefix(prefix, namespace))
}

func BuildShardKey(prefix string, namespace, shardID, keyType string) (string, error) {
if keyType != ShardStatisticsKey {
return "", fmt.Errorf("invalid shard key type: %s", keyType)
}
return fmt.Sprintf("%s%s/%s", BuildShardPrefix(prefix, namespace), shardID, keyType), nil
}

func ParseShardKey(prefix string, namespace, key string) (shardID, keyType string, err error) {
prefix = BuildShardPrefix(prefix, namespace)
if !strings.HasPrefix(key, prefix) {
return "", "", fmt.Errorf("key '%s' does not have expected prefix '%s'", key, prefix)
}
remainder := strings.TrimPrefix(key, prefix)
parts := strings.Split(remainder, "/")
if len(parts) != 2 {
return "", "", fmt.Errorf("unexpected shard key format: %s", key)
}
return parts[0], parts[1], nil
}

func BuildMetadataKey(prefix string, namespace, executorID, metadataKey string) string {
metadataKeyPrefix, err := BuildExecutorKey(prefix, namespace, executorID, ExecutorMetadataKey)
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions service/sharddistributor/store/etcd/etcdkeys/etcdkeys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ func TestBuildExecutorPrefix(t *testing.T) {
assert.Equal(t, "/cadence/test-ns/executors/", got)
}

func TestBuildShardPrefix(t *testing.T) {
got := BuildShardPrefix("/cadence", "test-ns")
assert.Equal(t, "/cadence/test-ns/shards/", got)
}

func TestBuildExecutorKey(t *testing.T) {
got, err := BuildExecutorKey("/cadence", "test-ns", "exec-1", "heartbeat")
assert.NoError(t, err)
Expand All @@ -27,6 +32,17 @@ func TestBuildExecutorKeyFail(t *testing.T) {
assert.ErrorContains(t, err, "invalid key type: invalid")
}

func TestBuildShardKey(t *testing.T) {
got, err := BuildShardKey("/cadence", "test-ns", "shard-1", "statistics")
assert.NoError(t, err)
assert.Equal(t, "/cadence/test-ns/shards/shard-1/statistics", got)
}

func TestBuildShardKeyFail(t *testing.T) {
_, err := BuildShardKey("/cadence", "test-ns", "shard-1", "invalid")
assert.ErrorContains(t, err, "invalid shard key type: invalid")
}

func TestParseExecutorKey(t *testing.T) {
// Valid key
executorID, keyType, err := ParseExecutorKey("/cadence", "test-ns", "/cadence/test-ns/executors/exec-1/heartbeat")
Expand All @@ -43,6 +59,22 @@ func TestParseExecutorKey(t *testing.T) {
assert.ErrorContains(t, err, "unexpected key format: /cadence/test-ns/executors/exec-1/heartbeat/extra")
}

func TestParseShardKey(t *testing.T) {
// Valid key
shardID, keyType, err := ParseShardKey("/cadence", "test-ns", "/cadence/test-ns/shards/shard-1/statistics")
assert.NoError(t, err)
assert.Equal(t, "shard-1", shardID)
assert.Equal(t, "statistics", keyType)

// Prefix missing
_, _, err = ParseShardKey("/cadence", "test-ns", "/cadence/other/shards/shard-1/statistics")
assert.ErrorContains(t, err, "key '/cadence/other/shards/shard-1/statistics' does not have expected prefix '/cadence/test-ns/shards/'")

// Unexpected format
_, _, err = ParseShardKey("/cadence", "test-ns", "/cadence/test-ns/shards/shard-1/statistics/extra")
assert.ErrorContains(t, err, "unexpected shard key format: /cadence/test-ns/shards/shard-1/statistics/extra")
}

func TestBuildMetadataKey(t *testing.T) {
got := BuildMetadataKey("/cadence", "test-ns", "exec-1", "my-metadata-key")
assert.Equal(t, "/cadence/test-ns/executors/exec-1/metadata/my-metadata-key", got)
Expand Down
Loading