Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ shardDistribution:
namespaces:
- name: shard-distributor-canary
type: fixed
mode: onboarded
shardNum: 32
- name: shard-distributor-canary-ephemeral
mode: onboarded
type: ephemeral
leaderStore:
storageParams:
Expand Down
27 changes: 27 additions & 0 deletions service/sharddistributor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
"github.com/uber/cadence/common/types"
)

type (
Expand Down Expand Up @@ -86,6 +87,32 @@ const (
NamespaceTypeEphemeral = "ephemeral"
)

const (
MigrationModeINVALID = "invalid"
MigrationModeLOCALPASSTHROUGH = "local_pass"
MigrationModeLOCALPASSTHROUGHSHADOW = "local_pass_shadow"
MigrationModeDISTRIBUTEDPASSTHROUGH = "distributed_pass"
MigrationModeONBOARDED = "onboarded"
)

var configMode = map[string]types.MigrationMode{
MigrationModeINVALID: types.MigrationModeINVALID,
MigrationModeLOCALPASSTHROUGH: types.MigrationModeLOCALPASSTHROUGH,
MigrationModeLOCALPASSTHROUGHSHADOW: types.MigrationModeLOCALPASSTHROUGHSHADOW,
MigrationModeDISTRIBUTEDPASSTHROUGH: types.MigrationModeDISTRIBUTEDPASSTHROUGH,
MigrationModeONBOARDED: types.MigrationModeONBOARDED,
}

func (s *ShardDistribution) GetMigrationMode(namespace string) types.MigrationMode {
for _, ns := range s.Namespaces {
if ns.Name == namespace {
return configMode[ns.Mode]
}
}
// TODO in the dynamic configuration I will setup a default value
return configMode[MigrationModeONBOARDED]
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
return &Config{
Expand Down
79 changes: 70 additions & 9 deletions service/sharddistributor/handler/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"time"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
)

Expand All @@ -16,16 +19,23 @@ const (
)

type executor struct {
timeSource clock.TimeSource
storage store.Store
logger log.Logger
timeSource clock.TimeSource
storage store.Store
shardDistributionCfg config.ShardDistribution
}

func NewExecutorHandler(storage store.Store,
func NewExecutorHandler(
logger log.Logger,
storage store.Store,
timeSource clock.TimeSource,
shardDistributionCfg config.ShardDistribution,
) Executor {
return &executor{
timeSource: timeSource,
storage: storage,
logger: logger,
timeSource: timeSource,
storage: storage,
shardDistributionCfg: shardDistributionCfg,
}
}

Expand All @@ -38,12 +48,29 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe

now := h.timeSource.Now().UTC()

mode := h.shardDistributionCfg.GetMigrationMode(request.Namespace)

switch mode {
case types.MigrationModeINVALID:
h.logger.Warn("Migration mode is invalid", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID))
return nil, fmt.Errorf("migration mode is invalid")
case types.MigrationModeLOCALPASSTHROUGH:
h.logger.Warn("Migration mode is local passthrough, no calls to heartbeat allowed", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID))
return nil, fmt.Errorf("migration mode is local passthrough")
// From SD perspective the behaviour is the same
case types.MigrationModeLOCALPASSTHROUGHSHADOW, types.MigrationModeDISTRIBUTEDPASSTHROUGH:
assignedShards, err = h.assignShardsInCurrentHeartbeat(ctx, request, previousHeartbeat, assignedShards)
if err != nil {
return nil, err
}
}

// If the state has changed we need to update heartbeat data.
// Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate.
if previousHeartbeat != nil && request.Status == previousHeartbeat.Status {
if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED {
lastHeartbeatTime := time.Unix(previousHeartbeat.LastHeartbeat, 0)
if now.Sub(lastHeartbeatTime) < _heartbeatRefreshRate {
return _convertResponse(assignedShards), nil
return _convertResponse(assignedShards, mode), nil
}
}

Expand All @@ -58,14 +85,48 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
return nil, fmt.Errorf("record heartbeat: %w", err)
}

return _convertResponse(assignedShards), nil
return _convertResponse(assignedShards, mode), nil
}

// assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat
func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest, previousHeartbeat *store.HeartbeatState, previousAssignedShards *store.AssignedState) (*store.AssignedState, error) {
assignedShards := *previousAssignedShards

assignedShards = store.AssignedState{
AssignedShards: make(map[string]*types.ShardAssignment),
LastUpdated: h.timeSource.Now().Unix(),
ModRevision: int64(0),
}
err := h.storage.DeleteExecutors(ctx, request.GetNamespace(), []string{request.GetExecutorID()}, store.NopGuard())
if err != nil {
return nil, fmt.Errorf("delete executors: %w", err)
}
for shard := range request.GetShardStatusReports() {
assignedShards.AssignedShards[shard] = &types.ShardAssignment{
Status: types.AssignmentStatusREADY,
}
}
assignShardsRequest := store.AssignShardsRequest{
NewState: &store.NamespaceState{
ShardAssignments: map[string]store.AssignedState{
request.GetExecutorID(): assignedShards,
},
},
}
err = h.storage.AssignShards(ctx, request.GetNamespace(), assignShardsRequest, store.NopGuard())
if err != nil {
return nil, fmt.Errorf("assign shards in current heartbeat: %w", err)
}

return &assignedShards, nil
}

func _convertResponse(shards *store.AssignedState) *types.ExecutorHeartbeatResponse {
func _convertResponse(shards *store.AssignedState, mode types.MigrationMode) *types.ExecutorHeartbeatResponse {
res := &types.ExecutorHeartbeatResponse{}
if shards == nil {
return res
}
res.ShardAssignments = shards.AssignedShards
res.MigrationMode = mode
return res
}
Loading
Loading