Skip to content

Commit 8a647ae

Browse files
US990352: Get correlation id from worker task message (#216)
1 parent 6807c85 commit 8a647ae

File tree

8 files changed

+75
-78
lines changed

8 files changed

+75
-78
lines changed

worker-workflow-testing/src/main/java/com/github/cafdataprocessing/workers/workflow/testing/models/WorkerTaskDataMock.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ public class WorkerTaskDataMock implements WorkerTaskData
3333
private final TrackingInfo trackingInfo;
3434
private final String to;
3535
private final TaskSourceInfo taskSourceInfo;
36+
private final String correlationId;
3637

3738
public WorkerTaskDataMock(final String classifier, final int version, final TaskStatus status, final byte[] data,
38-
final byte[] context, final TrackingInfo trackingInfo, final String to, final TaskSourceInfo taskSourceInfo)
39+
final byte[] context, final TrackingInfo trackingInfo, final String to,
40+
final TaskSourceInfo taskSourceInfo, final String correlationId)
3941
{
4042
this.classifier = classifier;
4143
this.version = version;
@@ -45,6 +47,7 @@ public WorkerTaskDataMock(final String classifier, final int version, final Task
4547
this.trackingInfo = trackingInfo;
4648
this.to = to;
4749
this.taskSourceInfo = taskSourceInfo;
50+
this.correlationId = correlationId;
4851
}
4952

5053
@Override
@@ -79,7 +82,7 @@ public byte[] getContext()
7982

8083
@Override
8184
public String getCorrelationId() {
82-
return null;
85+
return correlationId;
8386
}
8487

8588
@Override

worker-workflow-testing/src/main/java/com/github/cafdataprocessing/workers/workflow/testing/utils/WorkflowHelper.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,11 @@ public static Invocable createInvocableJavascriptEngine(final List<String> codes
158158
public static Document createDocument(final String reference, final Fields fields, final Failures failures,
159159
final Map<String, String> customData, final Subdocuments subdocuments,
160160
final Document parentDoc, final Document rootDoc, final boolean includeApplication,
161-
final boolean inputMessageProcessor)
161+
final boolean inputMessageProcessor, final String correlationId)
162162
{
163163
final TaskSourceInfo tsi = new TaskSourceInfo("source_name", "5");
164164
final WorkerTaskData wtd = new WorkerTaskDataMock("classifier", 2, TaskStatus.RESULT_SUCCESS, new byte[0], new byte[0], null,
165-
"to", tsi);
165+
"to", tsi, correlationId);
166166
final DocumentWorkerConfiguration dwc = new DocumentWorkerConfiguration();
167167
dwc.setWorkerName("worker-base");
168168
dwc.setWorkerVersion("1.0.0-SNAPSHOT");
@@ -204,11 +204,11 @@ public static Document createDocument(final String reference, final Fields field
204204
public static Subdocument createSubdocument(final String reference, final Fields fields, final Failures failures,
205205
final Subdocuments subdocuments,
206206
final Document parentDoc, final Document rootDoc, final boolean includeApplication,
207-
final boolean inputMessageProcessor)
207+
final boolean inputMessageProcessor, final String correlationId)
208208
{
209209
final TaskSourceInfo tsi = new TaskSourceInfo("source_name", "5");
210210
final WorkerTaskData wtd = new WorkerTaskDataMock("classifier", 2, TaskStatus.RESULT_SUCCESS, new byte[0], new byte[0], null,
211-
"to", tsi);
211+
"to", tsi, correlationId);
212212
final DocumentWorkerConfiguration dwc = new DocumentWorkerConfiguration();
213213
dwc.setWorkerName("worker-base");
214214
dwc.setWorkerVersion("1.0.0-SNAPSHOT");

worker-workflow/src/main/java/com/github/cafdataprocessing/workers/workflow/WorkflowWorker.java

+4-17
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.github.cafdataprocessing.workers.workflow.model.Workflow;
2727
import com.google.common.base.Strings;
2828
import java.util.Optional;
29-
import java.util.UUID;
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231

@@ -41,7 +40,6 @@ public final class WorkflowWorker implements DocumentWorker
4140
{
4241
private static final Logger LOG = LoggerFactory.getLogger(WorkflowWorker.class);
4342
private static final String TENANT_ID_KEY = "tenantId";
44-
private static final String CORRELATION_ID_KEY = "correlationId";
4543
private static final String SETTINGS_SERVICE_LAST_UPDATE_TIME_MILLIS_KEY = "settingsServiceLastUpdateTimeMillis";
4644
private final WorkflowManager workflowManager;
4745
private final ScriptManager scriptManager;
@@ -173,35 +171,24 @@ private void addMdcLoggingData(final Task task)
173171
// https://github.com/CAFapi/caf-logging/tree/v1.0.0#pattern
174172
// https://github.com/CAFapi/caf-logging/blob/v1.0.0/src/main/resources/logback.xml#L27
175173
//
176-
// This function adds a tenantId and correlationID to the MDC (http://logback.qos.ch/manual/mdc.html), so that log messages from
174+
// This function adds a tenantId to the MDC (http://logback.qos.ch/manual/mdc.html), so that log messages from
177175
// *this* worker (workflow-worker) will contain these values.
176+
// Additionally, code in worker-framework adds the correlationId to the MDC context.
178177
//
179178
// See also addMdcData in workflow-control.js, which performs similar logic to ensure log messages from *subsequent* workers in
180179
// the workflow also contain these values.
181180

182-
// Get MDC data from custom data, creating a correlationId if it doesn't yet exist.
181+
// Get tenantId from custom data
183182
final String tenantId = task.getCustomData(TENANT_ID_KEY);
184-
final String correlationId = WorkflowWorker.getOrCreateCorrelationId(task);
185183

186-
// Add tenantId and correlationId to the MDC.
184+
// Add tenantId to the MDC
187185
if (tenantId != null) {
188186
MDC.put(TENANT_ID_KEY, tenantId);
189187
}
190-
MDC.put(CORRELATION_ID_KEY, correlationId);
191188

192189
// Add MDC data to custom data so that its passed it onto the next worker.
193190
final ResponseCustomData responseCustomData = task.getResponse().getCustomData();
194191
responseCustomData.put(TENANT_ID_KEY, tenantId);
195-
responseCustomData.put(CORRELATION_ID_KEY, correlationId);
196-
}
197-
198-
private static String getOrCreateCorrelationId(final Task task)
199-
{
200-
final String correlationId = task.getCustomData(CORRELATION_ID_KEY);
201-
202-
return (correlationId == null)
203-
? UUID.randomUUID().toString()
204-
: correlationId;
205192
}
206193

207194
private static Optional<Long> getSettingsServiceLastUpdateTimeMillis(final Document document) throws NumberFormatException

worker-workflow/src/main/resources/add-failures.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,20 @@ function addFailures (document, failures, extractSourceCallback, action) {
5252
failureId = failureId.substring(0, 32);
5353
}
5454

55+
var correlationId;
56+
var workerTaskData = document.getTask().getService(com.github.workerframework.api.WorkerTaskData.class);
57+
if (workerTaskData !== null) {
58+
correlationId = workerTaskData.getCorrelationId() || undefined;
59+
}
60+
5561
var errorObject = {
5662
ID: failureId,
5763
WORKFLOW_ACTION: workflowAction,
5864
COMPONENT: component,
5965
WORKFLOW_NAME: document.getRootDocument().getField("CAF_WORKFLOW_NAME").getStringValues().get(0),
6066
MESSAGE: f.getFailureMessage(),
6167
DATE: new Date().toISOString(),
62-
CORRELATION_ID: document.getCustomData("correlationId") || undefined
68+
CORRELATION_ID: correlationId
6369
};
6470

6571
if (!isWarningFlag) {

worker-workflow/src/main/resources/workflow-control.js

+3-11
Original file line numberDiff line numberDiff line change
@@ -37,32 +37,25 @@ function addMdcLoggingData(e) {
3737
// https://github.com/CAFapi/caf-logging/tree/v1.0.0#pattern
3838
// https://github.com/CAFapi/caf-logging/blob/v1.0.0/src/main/resources/logback.xml#L27
3939
//
40-
// This function adds a tenantId and correlationID to the MDC (http://logback.qos.ch/manual/mdc.html), so that log messages
40+
// This function adds a tenantId to the MDC (http://logback.qos.ch/manual/mdc.html), so that log messages
4141
// from workers in the workflow will contain these values.
42+
// Additionally, code in worker-framework adds the correlationId to the MDC context.
4243
//
4344
// See also addMdcData in WorkflowWorker, which performs similar logic to ensure log messages from the workflow-worker itself also
4445
// contain these values.
4546

4647
// Get MDC data from custom data.
4748
var tenantId = e.task.getCustomData("tenantId");
48-
var correlationId = e.task.getCustomData("correlationId");
49-
50-
// Generate a random correlationId if it doesn't yet exist.
51-
if (!correlationId) {
52-
correlationId = UUID.randomUUID().toString();
53-
}
5449

55-
// Only if this worker is NOT a bulk worker; add tenantId and correlationId to the MDC.
50+
// Only if this worker is NOT a bulk worker; add tenantId to the MDC.
5651
if (!isBulkWorker(e)) {
5752
if (tenantId) {
5853
MDC.put("tenantId", tenantId);
5954
}
60-
MDC.put("correlationId", correlationId);
6155
}
6256

6357
// Add MDC data to custom data so that its passed it onto the next worker.
6458
e.task.getResponse().getCustomData().put("tenantId", tenantId);
65-
e.task.getResponse().getCustomData().put("correlationId", correlationId);
6659
}
6760

6861
function isBulkWorker(e) {
@@ -80,7 +73,6 @@ function onAfterProcessTask(eventObj) {
8073

8174
function removeMdcLoggingData() {
8275
MDC.remove("tenantId");
83-
MDC.remove("correlationId");
8476
}
8577

8678
function onBeforeProcessDocument(e) {

worker-workflow/src/test/java/com/github/cafdataprocessing/workers/workflow/FailureFieldsManagerTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ public void callingAddFailuresFromOutsideScript() throws Exception
112112
Paths.get("src", "test", "resources", "input-document-with-subdoc-with-stack.json").toString()).build();
113113

114114
final Document document = WorkflowHelper.createDocument("ref_1", builderDoc.getFields(), builderDoc.getFailures(),
115-
null, builderDoc.getSubdocuments(), builderDoc, builderDoc, true, true);
115+
null, builderDoc.getSubdocuments(), builderDoc, builderDoc,
116+
true, true, null);
116117
invocable.invokeFunction("testDocument", document, document.getFailures(), "on_premise");
117118

118119
assertEquals(document.getField("FAILURES").getValues().stream().filter(x -> !x.getStringValue().isEmpty()).count(), 1L);
@@ -141,7 +142,8 @@ public void callingAddFailuresFromOutsideScriptForWarnings() throws Exception
141142
Paths.get("src", "test", "resources", "input-document-with-subdoc-for-warning-test.json").toString()).build();
142143

143144
final Document document = WorkflowHelper.createDocument("ref_1", builderDoc.getFields(), builderDoc.getFailures(),
144-
null, builderDoc.getSubdocuments(), builderDoc, builderDoc, true, true);
145+
null, builderDoc.getSubdocuments(), builderDoc, builderDoc,
146+
true, true, null);
145147
invocable.invokeFunction("testDocument", document, document.getFailures(), "on_premise");
146148

147149
assertEquals(document.getField("WARNINGS").getValues().stream().filter(x -> !x.getStringValue().isEmpty()).count(), 1L);

0 commit comments

Comments
 (0)