Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2de12d8
feat(shard distributor): add shard key helpers and metrics state
AndreasHolt Oct 19, 2025
5d95067
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
6e57536
fix(shard distributor): update LastMoveTime in the case where a shard…
AndreasHolt Oct 19, 2025
595d320
test(shard distributor): add tests for shard metrics
AndreasHolt Oct 19, 2025
d9ba54d
fix(shard distributor): modify comment
AndreasHolt Oct 19, 2025
32d2ecd
fix(shard distributor): add atomic check to prevent metrics race
AndreasHolt Oct 19, 2025
b624a00
fix(shard distributor): apply shard metric updates in a second phase …
AndreasHolt Oct 19, 2025
aad7b2e
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
6360f8a
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
1536d0a
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
f316fbf
test(shard distributor): BuildShardPrefix, BuildShardKey, ParseShardKey
AndreasHolt Oct 22, 2025
4524da9
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
126f725
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc53f68
test(shard distributor): small changes to shard key tests s.t. they l…
AndreasHolt Oct 25, 2025
733bbcb
fix(shard distributor): no longer check for key type ShardStatisticsK…
AndreasHolt Oct 25, 2025
6816b8e
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
f97e0cf
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
513e88c
feat(shard distributor): clean up the shard statistics
AndreasHolt Oct 29, 2025
9833525
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
0332fe5
fix(shard distributor): add mapping (new metric)
AndreasHolt Oct 29, 2025
d5a13d9
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions service/sharddistributor/store/etcd/etcdkeys/etcdkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
ExecutorReportedShardsKey = "reported_shards"
ExecutorAssignedStateKey = "assigned_state"
ShardAssignedKey = "assigned"
ShardMetricsKey = "metrics"
)

var validKeyTypes = []string{
Expand Down Expand Up @@ -57,3 +58,27 @@ func ParseExecutorKey(prefix string, namespace, key string) (executorID, keyType
}
return parts[0], parts[1], nil
}

func BuildShardPrefix(prefix string, namespace string) string {
Copy link
Contributor

@eleonoradgr eleonoradgr Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have tests for BuildShardPrefix, BuildShardKey and ParseShardKey :)

return fmt.Sprintf("%s/shards/", BuildNamespacePrefix(prefix, namespace))
}

func BuildShardKey(prefix string, namespace, shardID, keyType string) (string, error) {
if keyType != ShardAssignedKey && keyType != ShardMetricsKey {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where/when is this used?

return "", fmt.Errorf("invalid shard key type: %s", keyType)
}
return fmt.Sprintf("%s%s/%s", BuildShardPrefix(prefix, namespace), shardID, keyType), nil
}

func ParseShardKey(prefix string, namespace, key string) (shardID, keyType string, err error) {
prefix = BuildShardPrefix(prefix, namespace)
if !strings.HasPrefix(key, prefix) {
return "", "", fmt.Errorf("key '%s' does not have expected prefix '%s'", key, prefix)
}
remainder := strings.TrimPrefix(key, prefix)
parts := strings.Split(remainder, "/")
if len(parts) != 2 {
return "", "", fmt.Errorf("unexpected shard key format: %s", key)
}
return parts[0], parts[1], nil
}
220 changes: 216 additions & 4 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ type executorStoreImpl struct {
shardCache *shardcache.ShardToExecutorCache
}

// shardMetricsUpdate tracks the etcd key, revision, and metrics used to update a shard
// after the main transaction in AssignShards for exec state.
// Retains metrics to safely merge concurrent updates before retrying.
type shardMetricsUpdate struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this statistics, metrics have a pretty standard meaning.

key string
shardID string
metrics store.ShardMetrics
modRevision int64
desiredLastMove int64 // intended LastMoveTime for this update
defaultLastUpdate int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the defaultLastUpdate? can we just call this LastUpdate?

}

// ExecutorStoreParams defines the dependencies for the etcd store, for use with fx.
type ExecutorStoreParams struct {
fx.In
Expand Down Expand Up @@ -201,6 +213,7 @@ func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string,
func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*store.NamespaceState, error) {
heartbeatStates := make(map[string]store.HeartbeatState)
assignedStates := make(map[string]store.AssignedState)
shardMetrics := make(map[string]store.ShardMetrics)

executorPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace)
resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
Expand Down Expand Up @@ -242,8 +255,30 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
assignedStates[executorID] = assigned
}

// Fetch shard-level metrics stored under shard namespace keys.
shardPrefix := etcdkeys.BuildShardPrefix(s.prefix, namespace)
shardResp, err := s.client.Get(ctx, shardPrefix, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("get shard data: %w", err)
}
for _, kv := range shardResp.Kvs {
shardID, shardKeyType, err := etcdkeys.ParseShardKey(s.prefix, namespace, string(kv.Key))
if err != nil {
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't want to abort metric emission , we should still log an error such that we have evidence that something is not working as expected.

if shardKeyType != etcdkeys.ShardMetricsKey {
continue
}
var shardMetric store.ShardMetrics
if err := json.Unmarshal(kv.Value, &shardMetric); err != nil {
continue
}
shardMetrics[shardID] = shardMetric
}

return &store.NamespaceState{
Executors: heartbeatStates,
ShardMetrics: shardMetrics,
ShardAssignments: assignedStates,
GlobalRevision: resp.Header.Revision,
}, nil
Expand Down Expand Up @@ -294,6 +329,75 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
var ops []clientv3.Op
var comparisons []clientv3.Cmp

// Compute shard moves to update last_move_time metrics when ownership changes.
// Read current assignments for the namespace and compare with the new state.
// Concurrent changes will be caught by the revision comparisons later.
currentAssignments := make(map[string]string) // shardID -> executorID
Copy link
Contributor

@eleonoradgr eleonoradgr Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of building all the execurorsTOShard mapping can we rely on shardCache *shardcache.ShardToExecutorCache cache?

executorPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace)
resp, err := s.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
if err != nil {
return fmt.Errorf("get current assignments: %w", err)
}
for _, kv := range resp.Kvs {
executorID, keyType, keyErr := etcdkeys.ParseExecutorKey(s.prefix, namespace, string(kv.Key))
if keyErr != nil || keyType != etcdkeys.ExecutorAssignedStateKey {
continue
}
var state store.AssignedState
if err := json.Unmarshal(kv.Value, &state); err != nil {
return fmt.Errorf("unmarshal current assigned state: %w", err)
}
for shardID := range state.AssignedShards {
currentAssignments[shardID] = executorID
}
}

// Build new owner map and detect moved shards.
newAssignments := make(map[string]string)
for executorID, state := range request.NewState.ShardAssignments {
for shardID := range state.AssignedShards {
newAssignments[shardID] = executorID
}
}
now := time.Now().Unix()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general we use clock.TimeSource to handle time, it makes testing easier, I would suggest to extend this using the same approach, you can check out in executorImpl for example

// Collect metric updates now so we can apply them after committing the main transaction.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move all the code for metric generation to a separate function? it will make the overall code more readable

var metricsUpdates []shardMetricsUpdate
for shardID, newOwner := range newAssignments {
if oldOwner, ok := currentAssignments[shardID]; ok && oldOwner == newOwner {
continue
}
// For a new or moved shard, update last_move_time while keeping existing metrics if available.
shardMetricsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardMetricsKey)
if err != nil {
return fmt.Errorf("build shard metrics key: %w", err)
}
var shardMetrics store.ShardMetrics
metricsModRevision := int64(0)
metricsResp, err := s.client.Get(ctx, shardMetricsKey)
if err != nil {
return fmt.Errorf("get shard metrics: %w", err)
}
if len(metricsResp.Kvs) > 0 {
metricsModRevision = metricsResp.Kvs[0].ModRevision
if err := json.Unmarshal(metricsResp.Kvs[0].Value, &shardMetrics); err != nil {
return fmt.Errorf("unmarshal shard metrics: %w", err)
}
} else {
shardMetrics.SmoothedLoad = 0
shardMetrics.LastUpdateTime = now
}
// Do not set LastMoveTime here, it will be applied later to avoid overwriting
// a newer timestamp if a concurrent rebalance has already updated it.
metricsUpdates = append(metricsUpdates, shardMetricsUpdate{
key: shardMetricsKey,
shardID: shardID,
metrics: shardMetrics,
modRevision: metricsModRevision,
desiredLastMove: now,
defaultLastUpdate: shardMetrics.LastUpdateTime,
})
}

// 1. Prepare operations to update executor states and shard ownership,
// and comparisons to check for concurrent modifications.
for executorID, state := range request.NewState.ShardAssignments {
Expand Down Expand Up @@ -357,9 +461,81 @@ func (s *executorStoreImpl) AssignShards(ctx context.Context, namespace string,
return fmt.Errorf("%w: transaction failed, a shard may have been concurrently assigned", store.ErrVersionConflict)
}

// Apply shard metrics updates outside the main transaction to stay within etcd's max operations per txn.
s.applyShardMetricsUpdates(ctx, namespace, metricsUpdates)

return nil
}

// applyShardMetricsUpdates updates shard metrics (like last_move_time) after AssignShards.
// Decided to run these writes outside the primary transaction
// so we are less likely to exceed etcd's max txn-op threshold (128?), and we retry
// logs failures instead of failing the main assignment.
func (s *executorStoreImpl) applyShardMetricsUpdates(ctx context.Context, namespace string, updates []shardMetricsUpdate) {
for i := range updates {
update := &updates[i]

for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very difficult to read, I am not sure we will understand what it does in few weeks, can we remove this? :)

// If a newer rebalance already set a later LastMoveTime, there's nothing left for this iteration.
if update.metrics.LastMoveTime >= update.desiredLastMove {
break
}

update.metrics.LastMoveTime = update.desiredLastMove

payload, err := json.Marshal(update.metrics)
if err != nil {
// Log and move on. failing metrics formatting should not invalidate the finished assignment.
s.logger.Warn("failed to marshal shard metrics after assignment", tag.ShardNamespace(namespace), tag.ShardKey(update.shardID), tag.Error(err))
break
}

txnResp, err := s.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(update.key), "=", update.modRevision)).
Then(clientv3.OpPut(update.key, string(payload))).
Commit()
if err != nil {
// log and abandon this shard rather than propagating an error after assignments commit.
s.logger.Warn("failed to commit shard metrics update after assignment", tag.ShardNamespace(namespace), tag.ShardKey(update.shardID), tag.Error(err))
break
}

if txnResp.Succeeded {
break
}

if ctx.Err() != nil {
s.logger.Warn("context canceled while updating shard metrics", tag.ShardNamespace(namespace), tag.ShardKey(update.shardID), tag.Error(ctx.Err()))
return
}

// Another writer beat us. pull the latest metrics so we can merge their view and retry.
metricsResp, err := s.client.Get(ctx, update.key)
if err != nil {
// Unable to observe the conflicting write, so we skip this shard and keep the assignment result.
s.logger.Warn("failed to refresh shard metrics after compare conflict", tag.ShardNamespace(namespace), tag.ShardKey(update.shardID), tag.Error(err))
break
}

update.modRevision = 0
if len(metricsResp.Kvs) > 0 {
update.modRevision = metricsResp.Kvs[0].ModRevision
if err := json.Unmarshal(metricsResp.Kvs[0].Value, &update.metrics); err != nil {
// If the value is corrupt we cannot safely merge, so we abandon this shard's metrics update.
s.logger.Warn("failed to unmarshal shard metrics after compare conflict", tag.ShardNamespace(namespace), tag.ShardKey(update.shardID), tag.Error(err))
break
}
} else {
update.metrics = store.ShardMetrics{
SmoothedLoad: 0,
LastUpdateTime: update.defaultLastUpdate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the way defaultLastUpdate is used, i think we can simplify the code and just remove it, wouln't be equal to use desired last move here?

}
update.modRevision = 0
}
}
}
}

func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID, executorID string) error {
assignedState, err := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorAssignedStateKey)
if err != nil {
Expand All @@ -369,16 +545,21 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
if err != nil {
return fmt.Errorf("build executor status key: %w", err)
}
shardMetricsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardMetricsKey)
if err != nil {
return fmt.Errorf("build shard metrics key: %w", err)
}

// Use a read-modify-write loop to handle concurrent updates safely.
for {
// 1. Get the current assigned state of the executor.
// 1. Get the current assigned state of the executor and prepare the shard metrics.
resp, err := s.client.Get(ctx, assignedState)
if err != nil {
return fmt.Errorf("get executor state: %w", err)
}

var state store.AssignedState
var shardMetrics store.ShardMetrics
modRevision := int64(0) // A revision of 0 means the key doesn't exist yet.

if len(resp.Kvs) > 0 {
Expand All @@ -393,6 +574,28 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
state.AssignedShards = make(map[string]*types.ShardAssignment)
}

metricsResp, err := s.client.Get(ctx, shardMetricsKey)
if err != nil {
return fmt.Errorf("get shard metrics: %w", err)
}
now := time.Now().Unix()
metricsModRevision := int64(0)
if len(metricsResp.Kvs) > 0 {
metricsModRevision = metricsResp.Kvs[0].ModRevision
if err := json.Unmarshal(metricsResp.Kvs[0].Value, &shardMetrics); err != nil {
return fmt.Errorf("unmarshal shard metrics: %w", err)
}
// Metrics already exist, update the last move time.
// This can happen if the shard was previously assigned to an executor, and a lookup happens after the executor is deleted,
// AssignShard is then called to assign the shard to a new executor.
shardMetrics.LastMoveTime = now
} else {
// Metrics don't exist, initialize them.
shardMetrics.SmoothedLoad = 0
shardMetrics.LastUpdateTime = now
shardMetrics.LastMoveTime = now
}

// 2. Modify the state in memory, adding the new shard if it's not already there.
if _, alreadyAssigned := state.AssignedShards[shardID]; !alreadyAssigned {
state.AssignedShards[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
Expand All @@ -403,13 +606,19 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,
return fmt.Errorf("marshal new assigned state: %w", err)
}

newMetricsValue, err := json.Marshal(shardMetrics)
if err != nil {
return fmt.Errorf("marshal new shard metrics: %w", err)
}

var comparisons []clientv3.Cmp

// 3. Prepare and commit the transaction with three atomic checks.
// 3. Prepare and commit the transaction with four atomic checks.
// a) Check that the executor's status is ACTIVE.
comparisons = append(comparisons, clientv3.Compare(clientv3.Value(statusKey), "=", _executorStatusRunningJSON))
// b) Check that the assigned_state key hasn't been changed by another process.
// b) Check that neither the assigned_state nor shard metrics were modified concurrently.
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(assignedState), "=", modRevision))
comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(shardMetricsKey), "=", metricsModRevision))
// c) Check that the cache is up to date.
cmp, err := s.shardCache.GetExecutorModRevisionCmp(namespace)
if err != nil {
Expand All @@ -428,7 +637,10 @@ func (s *executorStoreImpl) AssignShard(ctx context.Context, namespace, shardID,

txnResp, err := s.client.Txn(ctx).
If(comparisons...).
Then(clientv3.OpPut(assignedState, string(newStateValue))).
Then(
clientv3.OpPut(assignedState, string(newStateValue)),
clientv3.OpPut(shardMetricsKey, string(newMetricsValue)),
).
Commit()

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,59 @@ func TestAssignShardErrors(t *testing.T) {
assert.ErrorIs(t, err, store.ErrVersionConflict, "Error should be ErrVersionConflict for non-active executor")
}

// TestShardMetricsPersistence verifies that shard metrics are preserved on assignment
// when they already exist, and that GetState exposes them.
func TestShardMetricsPersistence(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

executorID := "exec-metrics"
shardID := "shard-metrics"

// 1. Setup: ensure executor is ACTIVE
require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, store.HeartbeatState{Status: types.ExecutorStatusACTIVE}))

// 2. Pre-create shard metrics as if coming from prior history
m := store.ShardMetrics{SmoothedLoad: 12.5, LastUpdateTime: 1234, LastMoveTime: 5678}
shardMetricsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardMetricsKey)
require.NoError(t, err)
payload, err := json.Marshal(m)
require.NoError(t, err)
_, err = tc.Client.Put(ctx, shardMetricsKey, string(payload))
require.NoError(t, err)

// 3. Assign the shard via AssignShard (should not clobber existing metrics)
require.NoError(t, executorStore.AssignShard(ctx, tc.Namespace, shardID, executorID))

// 4. Verify via GetState that metrics are preserved and exposed
nsState, err := executorStore.GetState(ctx, tc.Namespace)
require.NoError(t, err)
require.Contains(t, nsState.ShardMetrics, shardID)
updatedMetrics := nsState.ShardMetrics[shardID]
assert.Equal(t, m.SmoothedLoad, updatedMetrics.SmoothedLoad)
assert.Equal(t, m.LastUpdateTime, updatedMetrics.LastUpdateTime)
// This should be greater than the last move time
assert.Greater(t, updatedMetrics.LastMoveTime, m.LastMoveTime)

// 5. Also ensure assignment recorded correctly
require.Contains(t, nsState.ShardAssignments[executorID].AssignedShards, shardID)
}

// TestGetShardMetricsForMissingShard verifies GetState does not report metrics for unknown shards.
func TestGetShardMetricsForMissingShard(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// No metrics are written; GetState should not contain unknown shard
st, err := executorStore.GetState(ctx, tc.Namespace)
require.NoError(t, err)
assert.NotContains(t, st.ShardMetrics, "unknown")
}

// --- Test Setup ---

func stringStatus(s types.ExecutorStatus) string {
Expand Down
Loading
Loading