Skip to content

Commit ab67db7

Browse files
authored
Merge pull request #981 from Project-MONAI/nds-fixstuschange
fix for task update updating to same status
2 parents 862ba21 + 579dc75 commit ab67db7

File tree

4 files changed

+153
-3
lines changed

4 files changed

+153
-3
lines changed

src/WorkflowManager/Logging/Log.200000.Workflow.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ public static partial class Log
8484
[LoggerMessage(EventId = 200020, Level = LogLevel.Warning, Message = "Use new ArtifactReceived Queue for continuation messages.")]
8585
public static partial void DontUseWorkflowReceivedForPayload(this ILogger logger);
8686

87+
[LoggerMessage(EventId = 200021, Level = LogLevel.Trace, Message = "The task execution status for task {taskId} is already {status}. Payload: {payloadId}")]
88+
public static partial void TaskStatusUpdateNotNeeded(this ILogger logger, string payloadId, string taskId, string status);
89+
8790
// Conditions Resolver
8891
[LoggerMessage(EventId = 210000, Level = LogLevel.Warning, Message = "Failed to parse condition: {condition}. resolvedConditional: {resolvedConditional}")]
8992
public static partial void FailedToParseCondition(this ILogger logger, string resolvedConditional, string condition, Exception ex);

src/WorkflowManager/Logging/Log.700000.Artifact.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,15 @@ public static partial class Log
6060
[LoggerMessage(EventId = 700012, Level = LogLevel.Error, Message = "Error finding Task :{taskId}")]
6161
public static partial void ErrorFindingTask(this ILogger logger, string taskId);
6262

63-
[LoggerMessage(EventId = 700013, Level = LogLevel.Error, Message = "Error finding Task :{taskId} or previousTask {previousTask}")]
64-
public static partial void ErrorFindingTaskOrPrevious(this ILogger logger, string taskId, string previousTask);
63+
//[LoggerMessage(EventId = 700013, Level = LogLevel.Error, Message = "Error finding Task :{taskId} or previousTask {previousTask}")]
64+
//public static partial void ErrorFindingTaskOrPrevious(this ILogger logger, string taskId, string previousTask);
6565

6666
[LoggerMessage(EventId = 700014, Level = LogLevel.Warning, Message = "Error Task :{taskId} cant be trigger as it has missing artifacts {missingtypesJson}")]
6767
public static partial void ErrorTaskMissingArtifacts(this ILogger logger, string taskId, string missingtypesJson);
6868

69+
[LoggerMessage(EventId = 700015, Level = LogLevel.Warning, Message = "Error Task :{taskId} cant be trigger as it has missing artifacts {artifactName}")]
70+
public static partial void ErrorFindingArtifactInPrevious(this ILogger logger, string taskId, string artifactName);
71+
6972

7073
}
7174
}

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ private Dictionary<ArtifactType, bool> GetTasksInput(WorkflowRevision workflowTe
376376
var matchType = previousTask.Artifacts.Output.FirstOrDefault(t => t.Name == artifact.Name);
377377
if (matchType is null)
378378
{
379-
_logger.ErrorFindingTaskOrPrevious(taskId, previousTaskId);
379+
_logger.ErrorFindingArtifactInPrevious(taskId, artifact.Name);
380380
}
381381
else
382382
{
@@ -481,6 +481,12 @@ public async Task<bool> ProcessTaskUpdate(TaskUpdateEvent message)
481481
await ClinicalReviewTimeOutEvent(workflowInstance, currentTask, message.CorrelationId);
482482
}
483483

484+
if (message.Status == currentTask.Status)
485+
{
486+
_logger.TaskStatusUpdateNotNeeded(workflowInstance.PayloadId, message.TaskId, message.Status.ToString());
487+
return true;
488+
}
489+
484490
if (!message.Status.IsTaskExecutionStatusUpdateValid(currentTask.Status))
485491
{
486492
_logger.TaskStatusUpdateNotValid(workflowInstance.PayloadId, message.TaskId, currentTask.Status.ToString(), message.Status.ToString());

tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3702,6 +3702,88 @@ public async Task ProcessTaskUpdate_ValidTaskUpdateEventWithExportHl7TaskDestina
37023702
response.Should().BeTrue();
37033703
}
37043704

3705+
[Fact]
3706+
public async Task ProcessTaskUpdate_ValidTaskUpdateEventWith_Same_Status_returns_true()
3707+
{
3708+
var workflowInstanceId = Guid.NewGuid().ToString();
3709+
var taskId = Guid.NewGuid().ToString();
3710+
3711+
var updateEvent = new TaskUpdateEvent
3712+
{
3713+
WorkflowInstanceId = workflowInstanceId,
3714+
TaskId = "pizza",
3715+
ExecutionId = Guid.NewGuid().ToString(),
3716+
Status = TaskExecutionStatus.Succeeded,
3717+
};
3718+
3719+
var workflowId = Guid.NewGuid().ToString();
3720+
3721+
var workflow = new WorkflowRevision
3722+
{
3723+
Id = Guid.NewGuid().ToString(),
3724+
WorkflowId = workflowId,
3725+
Revision = 1,
3726+
Workflow = new Workflow
3727+
{
3728+
Name = "Workflowname2",
3729+
Description = "Workflowdesc2",
3730+
Version = "1",
3731+
InformaticsGateway = new InformaticsGateway
3732+
{
3733+
AeTitle = "aetitle"
3734+
},
3735+
Tasks = new TaskObject[]
3736+
{
3737+
new TaskObject {
3738+
Id = "pizza",
3739+
Type = "type",
3740+
Description = "taskdesc",
3741+
TaskDestinations = new TaskDestination[]
3742+
{
3743+
new TaskDestination
3744+
{
3745+
Name = "exporttaskid"
3746+
},
3747+
}
3748+
}
3749+
}
3750+
}
3751+
};
3752+
3753+
var workflowInstance = new WorkflowInstance
3754+
{
3755+
Id = workflowInstanceId,
3756+
WorkflowId = workflowId,
3757+
WorkflowName = workflow.Workflow.Name,
3758+
PayloadId = Guid.NewGuid().ToString(),
3759+
Status = Status.Created,
3760+
BucketId = "bucket",
3761+
Tasks = new List<TaskExecution>
3762+
{
3763+
new TaskExecution
3764+
{
3765+
TaskId = "pizza",
3766+
Status = TaskExecutionStatus.Succeeded
3767+
}
3768+
}
3769+
};
3770+
3771+
_workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<TaskExecutionStatus>())).ReturnsAsync(true);
3772+
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance);
3773+
_workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(workflowInstance.Id, It.IsAny<List<TaskExecution>>())).ReturnsAsync(true);
3774+
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowInstance.WorkflowId)).ReturnsAsync(workflow);
3775+
_payloadService.Setup(p => p.GetByIdAsync(It.IsAny<string>())).ReturnsAsync(new Payload { PatientDetails = new PatientDetails { } });
3776+
_artifactMapper.Setup(a => a.ConvertArtifactVariablesToPath(It.IsAny<Artifact[]>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>())).ReturnsAsync(new Dictionary<string, string> { { "dicomexport", "/dcm" } });
3777+
3778+
var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent);
3779+
3780+
_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny<Message>()), Times.Exactly(0));
3781+
3782+
_logger.Verify(logger => logger.IsEnabled(LogLevel.Trace),Times.Once);
3783+
3784+
response.Should().BeTrue();
3785+
}
3786+
37053787
[Fact]
37063788
public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs()
37073789
{
@@ -3977,6 +4059,62 @@ public async Task ProcessPayload_With_Passing_Workflow_Conditional_Should_Procce
39774059

39784060
Assert.True(result);
39794061
}
4062+
4063+
[Fact]
4064+
public async Task ProcessPayload_With_Empty_Workflow_Conditional_Should_Procced()
4065+
{
4066+
var workflowRequest = new WorkflowRequestEvent
4067+
{
4068+
Bucket = "testbucket",
4069+
DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" },
4070+
CorrelationId = Guid.NewGuid().ToString(),
4071+
Timestamp = DateTime.UtcNow
4072+
};
4073+
4074+
var workflows = new List<WorkflowRevision>
4075+
{
4076+
new() {
4077+
Id = Guid.NewGuid().ToString(),
4078+
WorkflowId = Guid.NewGuid().ToString(),
4079+
Revision = 1,
4080+
Workflow = new Workflow
4081+
{
4082+
Name = "Workflowname",
4083+
Description = "Workflowdesc",
4084+
Version = "1",
4085+
InformaticsGateway = new InformaticsGateway
4086+
{
4087+
AeTitle = "aetitle"
4088+
},
4089+
Tasks =
4090+
[
4091+
new TaskObject {
4092+
Id = Guid.NewGuid().ToString(),
4093+
Type = "type",
4094+
Description = "taskdesc"
4095+
}
4096+
],
4097+
Predicate = []
4098+
}
4099+
}
4100+
};
4101+
4102+
_dicom.Setup(w => w.GetAnyValueAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>()))
4103+
.ReturnsAsync(() => "lordge");
4104+
4105+
_workflowRepository.Setup(w => w.GetWorkflowsByAeTitleAsync(It.IsAny<List<string>>())).ReturnsAsync(workflows);
4106+
_workflowRepository.Setup(w => w.GetWorkflowsForWorkflowRequestAsync(It.IsAny<string>(), It.IsAny<string>())).ReturnsAsync(workflows);
4107+
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]);
4108+
_workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny<List<WorkflowInstance>>())).ReturnsAsync(true);
4109+
_workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny<List<string>>())).ReturnsAsync(new List<WorkflowInstance>());
4110+
_workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<TaskExecutionStatus>())).ReturnsAsync(true);
4111+
4112+
var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() });
4113+
4114+
_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny<Message>()), Times.Once());
4115+
4116+
Assert.True(result);
4117+
}
39804118
}
39814119
#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type.
39824120
}

0 commit comments

Comments
 (0)