Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,304 changes: 1,992 additions & 1,312 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions common/dynamicconfig/dynamicproperties/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,14 @@ const (
// Default value: true
// Allowed filters: N/A
EnableGRPCOutbound
// EnableExecutionInfoTracking is the key for enabling execution info tracking tasks on the mutable state.
// for the purpose of cleaning them up after workflow closed. This is a feature-flag with the intention
// of it being removed in the future and defaulted to true
// KeyName: system.enableExecutionInfoTracking
// Value type: Bool
// Default value: false
// Allowed filters: N/A
EnableExecutionInfoTracking
// EnableSQLAsyncTransaction is the key for enabling async transaction
// KeyName: system.enableSQLAsyncTransaction
// Value type: Bool
Expand Down Expand Up @@ -2808,6 +2816,14 @@ const (
// Default value: 5m (5*time.Minute)
// Allowed filters: N/A
StandbyClusterDelay
// WorkflowTaskCleanupThreshold Is the time, above which, it will attempt to cleanup tasks on workflow deletion
// but below which, it will skip cleanup attempts, based on the assumption that short-lived workflows will be mostly
// creating extremely short-lived timer tasks and there's no real value in explicitly going and deleting them
// KeyName: history.taskCleanupTimeoutThreshold
// Value type: Duration
// Default value: 1d (24 hours)
// Allowed filters: N/A
TaskCleanupTimeoutThreshold
// StandbyTaskMissingEventsResendDelay is the amount of time standby cluster's will wait (if events are missing)before calling remote for missing events
// KeyName: history.standbyTaskMissingEventsResendDelay
// Value type: Duration
Expand Down Expand Up @@ -4410,6 +4426,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableGRPCOutbound is the key for enabling outbound GRPC traffic",
DefaultValue: true,
},
EnableExecutionInfoTracking: {
KeyName: "system.enableExecutionInfoTracking",
Description: "EnableExecutionInfoTracking is the key for enabling execution info tracking tasks on the mutable state. for the purpose of cleaning them up after workflow closed. This is a feature-flag with the intention of it being removed in the future and defaulted to true",
DefaultValue: false,
},
EnableSQLAsyncTransaction: {
KeyName: "system.enableSQLAsyncTransaction",
Description: "EnableSQLAsyncTransaction is the key for enabling async transaction",
Expand Down Expand Up @@ -5387,6 +5408,11 @@ var DurationKeys = map[DurationKey]DynamicDuration{
Description: "StandbyClusterDelay is the artificial delay added to standby cluster's view of active cluster's time",
DefaultValue: time.Minute * 5,
},
TaskCleanupTimeoutThreshold: {
KeyName: "history.taskCleanupTimeoutThreshold",
Description: "TaskCleanupTimeoutThreshold is the time, above which, it will attempt to cleanup tasks on workflow deletion but below which, it will skip cleanup attempts, based on the assumption that short-lived workflows will be mostly creating extremely short-lived timer tasks and there's no real value in explicitly going and deleting them",
DefaultValue: time.Hour * 24,
},
StandbyTaskMissingEventsResendDelay: {
KeyName: "history.standbyTaskMissingEventsResendDelay",
Description: "StandbyTaskMissingEventsResendDelay is the amount of time standby cluster's will wait (if events are missing)before calling remote for missing events",
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ var (
StoreOperationCreateFailoverMarkerTasks = storeOperation("createFailoverMarkerTasks")
StoreOperationGetTimerIndexTasks = storeOperation("get-timer-index-tasks")
StoreOperationCompleteTimerTask = storeOperation("complete-timer-task")
StoreOperationDeleteTimerTask = storeOperation("delete-timer-task")
StoreOperationGetHistoryTasks = storeOperation("get-history-tasks")
StoreOperationCompleteHistoryTask = storeOperation("complete-history-task")
StoreOperationRangeCompleteHistoryTask = storeOperation("range-complete-history-task")
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ const (
PersistenceCompleteHistoryTaskScope
// PersistenceRangeCompleteHistoryTaskScope tracks RangeCompleteHistoryTask calls made by service to persistence layer
PersistenceRangeCompleteHistoryTaskScope
// PersistenceDeleteTimerTaskScope tracks DeleteTimerTask calls made by service to persistence layer
PersistenceDeleteTimerTaskScope
// PersistenceCreateTasksScope tracks CreateTask calls made by service to persistence layer
PersistenceCreateTasksScope
// PersistenceGetTasksScope tracks GetTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1532,6 +1534,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
PersistenceGetHistoryTasksScope: {operation: "GetHistoryTasks"},
PersistenceCompleteHistoryTaskScope: {operation: "CompleteHistoryTask"},
PersistenceRangeCompleteHistoryTaskScope: {operation: "RangeCompleteHistoryTask"},
PersistenceDeleteTimerTaskScope: {operation: "DeleteTimerTask"},
PersistenceCreateTasksScope: {operation: "CreateTask"},
PersistenceGetTasksScope: {operation: "GetTasks"},
PersistenceCompleteTaskScope: {operation: "CompleteTask"},
Expand Down
14 changes: 14 additions & 0 deletions common/mocks/ExecutionManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type (
// DynamicConfiguration represents dynamic configuration for persistence layer
DynamicConfiguration struct {
EnableExecutionInfoTracking dynamicproperties.BoolPropertyFn
EnableSQLAsyncTransaction dynamicproperties.BoolPropertyFn
EnableCassandraAllConsistencyLevelDelete dynamicproperties.BoolPropertyFn
PersistenceSampleLoggingRate dynamicproperties.IntPropertyFn
Expand All @@ -44,6 +45,7 @@ type (
// NewDynamicConfiguration returns new config with default values
func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration {
return &DynamicConfiguration{
EnableExecutionInfoTracking: dc.GetBoolProperty(dynamicproperties.EnableExecutionInfoTracking),
EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicproperties.EnableSQLAsyncTransaction),
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicproperties.EnableCassandraAllConsistencyLevelDelete),
PersistenceSampleLoggingRate: dc.GetIntProperty(dynamicproperties.SampleLoggingRate),
Expand Down
43 changes: 31 additions & 12 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,18 +597,19 @@ type (

// WorkflowMutableState indicates workflow related state
WorkflowMutableState struct {
ActivityInfos map[int64]*ActivityInfo
TimerInfos map[string]*TimerInfo
ChildExecutionInfos map[int64]*ChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
SignalInfos map[int64]*SignalInfo
SignalRequestedIDs map[string]struct{}
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
BufferedEvents []*types.HistoryEvent
VersionHistories *VersionHistories
ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
Checksum checksum.Checksum
ActivityInfos map[int64]*ActivityInfo
TimerInfos map[string]*TimerInfo
ChildExecutionInfos map[int64]*ChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
SignalInfos map[int64]*SignalInfo
SignalRequestedIDs map[string]struct{}
WorkflowTimerTaskInfos []*WorkflowTimerTaskInfo
ExecutionInfo *WorkflowExecutionInfo
ExecutionStats *ExecutionStats
BufferedEvents []*types.HistoryEvent
VersionHistories *VersionHistories
ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
Checksum checksum.Checksum
}

// ActivityInfo details.
Expand Down Expand Up @@ -661,6 +662,15 @@ type (
TaskStatus int64
}

// WorkflowTimerTaskInfo contains metadata about workflow-level timer tasks.
// These are timer tasks that are associated with the workflow execution itself
// rather than user-created timers or activities (e.g., WorkflowTimeoutTask).
WorkflowTimerTaskInfo struct {
TimeoutType int
TaskID int64
VisibilityTimestamp time.Time
}

// ChildExecutionInfo has details for pending child executions.
ChildExecutionInfo struct {
Version int64
Expand Down Expand Up @@ -876,6 +886,7 @@ type (
DeleteActivityInfos []int64
UpsertTimerInfos []*TimerInfo
DeleteTimerInfos []string
WorkflowTimerTasks []*WorkflowTimerTaskInfo
UpsertChildExecutionInfos []*ChildExecutionInfo
DeleteChildExecutionInfos []int64
UpsertRequestCancelInfos []*RequestCancelInfo
Expand Down Expand Up @@ -903,6 +914,7 @@ type (

ActivityInfos []*ActivityInfo
TimerInfos []*TimerInfo
WorkflowTimerTasks []*WorkflowTimerTaskInfo
ChildExecutionInfos []*ChildExecutionInfo
RequestCancelInfos []*RequestCancelInfo
SignalInfos []*SignalInfo
Expand Down Expand Up @@ -932,6 +944,12 @@ type (
DomainName string
}

// DeleteTimerTaskRequest is used to delete a timer task
DeleteTimerTaskRequest struct {
VisibilityTimestamp time.Time
TaskID int64
}

// PutReplicationTaskToDLQRequest is used to put a replication task to dlq
PutReplicationTaskToDLQRequest struct {
SourceClusterName string
Expand Down Expand Up @@ -1614,6 +1632,7 @@ type (
GetHistoryTasks(ctx context.Context, request *GetHistoryTasksRequest) (*GetHistoryTasksResponse, error)
CompleteHistoryTask(ctx context.Context, request *CompleteHistoryTaskRequest) error
RangeCompleteHistoryTask(ctx context.Context, request *RangeCompleteHistoryTaskRequest) (*RangeCompleteHistoryTaskResponse, error)
DeleteTimerTask(ctx context.Context, request *DeleteTimerTaskRequest) error

// Scan operations

Expand Down
14 changes: 14 additions & 0 deletions common/persistence/data_manager_interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type (
GetHistoryTasks(ctx context.Context, request *GetHistoryTasksRequest) (*GetHistoryTasksResponse, error)
CompleteHistoryTask(ctx context.Context, request *CompleteHistoryTaskRequest) error
RangeCompleteHistoryTask(ctx context.Context, request *RangeCompleteHistoryTaskRequest) (*RangeCompleteHistoryTaskResponse, error)
DeleteTimerTask(ctx context.Context, request *DeleteTimerTaskRequest) error

// Scan related methods
ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*InternalListConcreteExecutionsResponse, error)
Expand Down Expand Up @@ -351,6 +352,7 @@ type (
SignalInfos map[int64]*SignalInfo
SignalRequestedIDs map[string]struct{}
BufferedEvents []*DataBlob
WorkflowTimerTasks *DataBlob

// Checksum field is used by Cassandra storage
// ChecksumData is used by All SQL storage
Expand Down Expand Up @@ -462,6 +464,7 @@ type (
DeleteActivityInfos []int64
UpsertTimerInfos []*TimerInfo
DeleteTimerInfos []string
WorkflowTimerTasks *DataBlob
UpsertChildExecutionInfos []*InternalChildExecutionInfo
DeleteChildExecutionInfos []int64
UpsertRequestCancelInfos []*RequestCancelInfo
Expand Down Expand Up @@ -492,6 +495,7 @@ type (

ActivityInfos []*InternalActivityInfo
TimerInfos []*TimerInfo
WorkflowTimerTasks *DataBlob
ChildExecutionInfos []*InternalChildExecutionInfo
RequestCancelInfos []*RequestCancelInfo
SignalInfos []*SignalInfo
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/data_store_interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading