Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions service/matching/event/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,14 @@
package event

import (
"encoding/json"
"fmt"
"os"
"time"

"go.uber.org/zap"

"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

var enabled = false

func init() {
enabled = os.Getenv("MATCHING_LOG_EVENTS") == "true"
}

type E struct {
persistence.TaskInfo
TaskListName string
Expand All @@ -51,16 +44,15 @@ type E struct {
}

func Log(events ...E) {
if !enabled {
return
}
for _, e := range events {
e.EventTime = time.Now()
data, err := json.Marshal(e)
if err != nil {
fmt.Printf("failed to marshal event: %v", err)
}

fmt.Printf("Matching New Event: %s\n", data)
zap.L().Debug(e.EventName, zap.Any("event", e),
zap.String("taskListName", e.TaskListName),
zap.Stringer("taskListKind", e.TaskListKind),
zap.Int("taskListType", e.TaskListType),
zap.Time("eventTime", e.EventTime),
zap.String("wf-domain-id", e.DomainID),
zap.String("wf-id", e.WorkflowID),
zap.String("wf-run-id", e.RunID),
zap.Any("event", e))
}
}
5 changes: 5 additions & 0 deletions service/matching/tasklist/task_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func (tc *taskCompleterImpl) CompleteTaskIfStarted(ctx context.Context, task *In
}

if domainEntry.IsActiveIn(tc.clusterMetadata.GetCurrentClusterName()) {
tc.logger.Debug("Domain is active in the current cluster, completing task",
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
tag.WorkflowID(task.Event.WorkflowID),
tag.WorkflowRunID(task.Event.RunID),
)
return errDomainIsActive
}

Expand Down
6 changes: 6 additions & 0 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,12 @@ func (c *taskListManagerImpl) DispatchTask(ctx context.Context, task *InternalTa
}

if domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()) {
c.logger.Debug("Domain is active in the current cluster, dispatching task",
tag.WorkflowDomainID(task.Event.TaskInfo.DomainID),
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
tag.WorkflowID(task.Event.TaskInfo.WorkflowID),
tag.WorkflowRunID(task.Event.TaskInfo.RunID),
)
return c.matcher.MustOffer(ctx, task)
}

Expand Down
3 changes: 3 additions & 0 deletions service/matching/tasklist/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,9 @@ func (tr *taskReader) newDispatchContext(isolationGroup string, isolationDuratio
return context.WithTimeout(tr.cancelCtx, timeout)
}
if domainEntry.IsActiveIn(tr.clusterMetadata.GetCurrentClusterName()) {
tr.logger.Debug("domain is active in the current cluster, setting timeout",
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
)
// if the domain is active in the current cluster, set the timeout
return context.WithTimeout(tr.cancelCtx, timeout)
}
Expand Down
Loading