Skip to content

Commit 414bd7e

Browse files
committed
fix status setting
Signed-off-by: Neil South <[email protected]>
1 parent 4004174 commit 414bd7e

File tree

13 files changed

+189
-54
lines changed

13 files changed

+189
-54
lines changed

src/TaskManager/Plug-ins/Argo/ArgoClient.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
2525
{
2626
public class ArgoClient : BaseArgoClient, IArgoClient
2727
{
28-
public ArgoClient(HttpClient httpClient, ILogger<ArgoClient> logger) : base(httpClient, logger) { }
28+
public ArgoClient(HttpClient httpClient, ILoggerFactory logger) : base(httpClient, logger) { }
2929

3030
public async Task<Workflow> Argo_CreateWorkflowAsync(string argoNamespace, WorkflowCreateRequest body, CancellationToken cancellationToken)
3131
{
@@ -232,11 +232,11 @@ public class BaseArgoClient
232232

233233
protected readonly HttpClient HttpClient;
234234

235-
protected readonly ILogger<ArgoClient> Logger;
236-
public BaseArgoClient(HttpClient httpClient, ILogger<ArgoClient> logger)
235+
protected readonly ILogger Logger;
236+
public BaseArgoClient(HttpClient httpClient, ILoggerFactory loggerFactory)
237237
{
238238
HttpClient = httpClient;
239-
Logger = logger;
239+
Logger = loggerFactory.CreateLogger("BaseArgoClient");
240240
}
241241

242242
protected async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder urlBuilder, string method, CancellationToken cancellationToken)

src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -902,18 +902,24 @@ private async ValueTask DisposeAsyncCore()
902902
public override async Task HandleTimeout(string identity)
903903
{
904904
var client = _argoProvider.CreateClient(_baseUrl, _apiToken, _allowInsecure);
905-
906-
await client.Argo_StopWorkflowAsync(_namespace, identity, new WorkflowStopRequest
905+
try
907906
{
908-
Namespace = _namespace,
909-
Name = identity,
910-
});
907+
await client.Argo_StopWorkflowAsync(_namespace, identity, new WorkflowStopRequest
908+
{
909+
Namespace = _namespace,
910+
Name = identity,
911+
});
911912

912-
await client.Argo_TerminateWorkflowAsync(_namespace, identity, new WorkflowTerminateRequest
913+
await client.Argo_TerminateWorkflowAsync(_namespace, identity, new WorkflowTerminateRequest
914+
{
915+
Name = identity,
916+
Namespace = _namespace
917+
});
918+
}
919+
catch (Exception ex)
913920
{
914-
Name = identity,
915-
Namespace = _namespace
916-
});
921+
_logger.ExecptionStoppingArgoWorkflow(identity, ex);
922+
}
917923
}
918924

919925
public async Task<WorkflowTemplate> CreateArgoTemplate(string template)

src/TaskManager/Plug-ins/Argo/ArgoProvider.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ public class ArgoProvider : IArgoProvider
2727
{
2828
private readonly ILogger<ArgoProvider> _logger;
2929
private readonly IHttpClientFactory _httpClientFactory;
30-
private readonly ILogger<ArgoClient> _argoClientLogger;
31-
public ArgoProvider(ILogger<ArgoProvider> logger, IHttpClientFactory httpClientFactory, ILogger<ArgoClient> argoClientLogger)
30+
private readonly ILoggerFactory _logFactory;
31+
public ArgoProvider(ILogger<ArgoProvider> logger, IHttpClientFactory httpClientFactory, ILoggerFactory logFactory)
3232
{
3333
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
3434
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
35-
_argoClientLogger = argoClientLogger;
35+
_logFactory = logFactory;
3636
}
3737

3838
public IArgoClient CreateClient(string baseUrl, string? apiToken, bool allowInsecure = true)
@@ -51,7 +51,7 @@ public IArgoClient CreateClient(string baseUrl, string? apiToken, bool allowInse
5151
{
5252
httpClient.SetBearerToken(apiToken);
5353
}
54-
return new ArgoClient(httpClient, _argoClientLogger) { BaseUrl = baseUrl };
54+
return new ArgoClient(httpClient, _logFactory) { BaseUrl = baseUrl };
5555
}
5656
}
5757

src/TaskManager/Plug-ins/Argo/Logging/Log.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,8 @@ public static partial class Log
8383
[LoggerMessage(EventId = 1020, Level = LogLevel.Trace, Message = "Calling argo at url {url} : {method} : {stringContent}")]
8484
public static partial void CallingArgoHttpInfo(this ILogger logger, string url, string method, string stringContent);
8585

86+
[LoggerMessage(EventId = 1021, Level = LogLevel.Debug, Message = "Exception stopping argo workflow {workflowId}, does it exist?")]
87+
public static partial void ExecptionStoppingArgoWorkflow(this ILogger logger, string workflowId, Exception ex);
88+
8689
}
8790
}

src/TaskManager/TaskManager/Logging/Log.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,8 @@ public static partial class Log
122122

123123
[LoggerMessage(EventId = 120, Level = LogLevel.Error, Message = "Recovering connection to storage service: {reason}.")]
124124
public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason);
125+
126+
[LoggerMessage(EventId = 121, Level = LogLevel.Error, Message = "Exception handling task : '{assemblyName}' timeout.")]
127+
public static partial void ExectionTimingOutTask(this ILogger logger, string assemblyName, Exception ex);
125128
}
126129
}

src/TaskManager/TaskManager/TaskManager.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ private async Task TaskDispatchEventReceivedCallback(MessageReceivedEventArgs ar
181181

182182
private async Task TaskCancelationEventCallback(MessageReceivedEventArgs args)
183183
{
184+
// Cancelation just stops running tasks and does Not set any status
184185
await TaskCallBackGeneric<TaskCancellationEvent>(args, HandleCancellationTask);
185186
}
186187

@@ -240,6 +241,7 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
240241
}
241242

242243
var pluginAssembly = string.Empty;
244+
ITaskPlugin? taskRunner = null;
243245
try
244246
{
245247
var taskExecution = await _taskDispatchEventService.GetByTaskExecutionIdAsync(message.Body.ExecutionId).ConfigureAwait(false);
@@ -250,17 +252,28 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
250252
throw new InvalidOperationException("Task Event data not found.");
251253
}
252254

253-
var taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
254-
await taskRunner.HandleTimeout(message.Body.Identity);
255-
256-
AcknowledgeMessage(message);
255+
taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
257256
}
258257
catch (Exception ex)
259258
{
260259
_logger.UnsupportedRunner(pluginAssembly, ex);
261260
await HandleMessageException(message, message.Body.WorkflowInstanceId, message.Body.TaskId, message.Body.ExecutionId, false).ConfigureAwait(false);
262261
return;
263262
}
263+
264+
try
265+
{
266+
await taskRunner.HandleTimeout(message.Body.Identity);
267+
}
268+
catch (Exception ex)
269+
{
270+
// Ignoring exception here as we've asked for the task to be stopped.
271+
_logger.ExectionTimingOutTask(pluginAssembly, ex);
272+
}
273+
finally
274+
{
275+
AcknowledgeMessage(message);
276+
}
264277
}
265278

266279
private async Task HandleTaskCallback(JsonMessage<TaskCallbackEvent> message)

src/WorkflowManager/Logging/Log.200000.Workflow.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static partial class Log
6060
[LoggerMessage(EventId = 200012, Level = LogLevel.Error, Message = "The following task: {taskId} in workflow {workflowInstanceId} is currently timed out and not processing anymore updates, timed out at {timedOut}.")]
6161
public static partial void TaskTimedOut(this ILogger logger, string taskId, string workflowInstanceId, DateTime timedOut);
6262

63-
[LoggerMessage(EventId = 200013, Level = LogLevel.Critical, Message = "Workflow `{workflowId}` not found.")]
63+
[LoggerMessage(EventId = 200013, Level = LogLevel.Critical, Message = "Workflow `{workflowId}` not found or is deleted.")]
6464
public static partial void WorkflowNotFound(this ILogger logger, string workflowId);
6565

6666
[LoggerMessage(EventId = 200014, Level = LogLevel.Error, Message = "The task execution status for task {taskId} cannot be updated from {oldStatus} to {newStatus}. Payload: {payloadId}")]

src/WorkflowManager/MonaiBackgroundService/Worker.cs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,19 +93,6 @@ private async Task PublishCancellationEvent(TaskExecution task, string correlati
9393
{
9494
_logger.TimingOutTaskCancellationEvent(identity, task.WorkflowInstanceId);
9595

96-
var updateEvent = EventMapper.GenerateTaskUpdateEvent(new GenerateTaskUpdateEventParams
97-
{
98-
CorrelationId = correlationId,
99-
ExecutionId = task.ExecutionId,
100-
WorkflowInstanceId = workflowInstanceId,
101-
TaskId = task.TaskId,
102-
TaskExecutionStatus = TaskExecutionStatus.Failed,
103-
FailureReason = FailureReason.TimedOut,
104-
Stats = task.ExecutionStats
105-
});
106-
107-
updateEvent.Validate();
108-
10996
var cancellationEvent = EventMapper.GenerateTaskCancellationEvent(
11097
identity,
11198
task.ExecutionId,

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class WorkflowExecuterService : IWorkflowExecuterService
5858

5959
private string TaskDispatchRoutingKey { get; }
6060
private string ExportRequestRoutingKey { get; }
61-
private string TaskTimeoutRoutingKey { get; }
61+
private string ClinicalReviewTimeoutRoutingKey { get; }
6262

6363
public WorkflowExecuterService(
6464
ILogger<WorkflowExecuterService> logger,
@@ -88,7 +88,7 @@ public WorkflowExecuterService(
8888
_defaultTaskTimeoutMinutes = configuration.Value.TaskTimeoutMinutes;
8989
_defaultPerTaskTypeTimeoutMinutes = configuration.Value.PerTaskTypeTimeoutMinutes;
9090
TaskDispatchRoutingKey = configuration.Value.Messaging.Topics.TaskDispatchRequest;
91-
TaskTimeoutRoutingKey = configuration.Value.Messaging.Topics.AideClinicalReviewCancelation;
91+
ClinicalReviewTimeoutRoutingKey = configuration.Value.Messaging.Topics.AideClinicalReviewCancelation;
9292
_migExternalAppPlugins = configuration.Value.MigExternalAppPlugins.ToList();
9393
ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}";
9494

@@ -305,18 +305,9 @@ public async Task<bool> ProcessTaskUpdate(TaskUpdateEvent message)
305305
if (message.Reason == FailureReason.TimedOut && currentTask.Status == TaskExecutionStatus.Failed)
306306
{
307307
_logger.TaskTimedOut(message.TaskId, message.WorkflowInstanceId, currentTask.Timeout);
308-
await TimeOutEvent(workflowInstance, currentTask, message.CorrelationId);
308+
await ClinicalReviewTimeOutEvent(workflowInstance, currentTask, message.CorrelationId);
309309

310-
return false;
311-
}
312-
313-
var workflow = await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId);
314-
315-
if (workflow is null)
316-
{
317-
_logger.WorkflowNotFound(workflowInstance.WorkflowId);
318-
319-
return false;
310+
//return false;
320311
}
321312

322313
if (!message.Status.IsTaskExecutionStatusUpdateValid(currentTask.Status))
@@ -344,6 +335,15 @@ public async Task<bool> ProcessTaskUpdate(TaskUpdateEvent message)
344335
return await HandleUpdatingTaskStatus(currentTask, workflowInstance.WorkflowId, message.Status);
345336
}
346337

338+
var workflow = await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId);
339+
340+
if (workflow is null)
341+
{
342+
_logger.WorkflowNotFound(workflowInstance.WorkflowId);
343+
344+
return false;
345+
}
346+
347347
var isValid = await HandleOutputArtifacts(workflowInstance, message.Outputs, currentTask, workflow);
348348

349349
if (isValid is false)
@@ -808,13 +808,13 @@ private async Task<bool> ExportRequest(WorkflowInstance workflowInstance, TaskEx
808808
return true;
809809
}
810810

811-
private async Task<bool> TimeOutEvent(WorkflowInstance workflowInstance, TaskExecution taskExec, string correlationId)
811+
private async Task<bool> ClinicalReviewTimeOutEvent(WorkflowInstance workflowInstance, TaskExecution taskExec, string correlationId)
812812
{
813-
var exportRequestEvent = EventMapper.GenerateTaskCancellationEvent("", taskExec.ExecutionId, workflowInstance.Id, taskExec.TaskId, FailureReason.TimedOut, "Timed out");
814-
var jsonMesssage = new JsonMessage<TaskCancellationEvent>(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, correlationId, Guid.NewGuid().ToString());
813+
var cancelationRequestEvent = EventMapper.GenerateTaskCancellationEvent("", taskExec.ExecutionId, workflowInstance.Id, taskExec.TaskId, FailureReason.TimedOut, "Timed out");
814+
var jsonMesssage = new JsonMessage<TaskCancellationEvent>(cancelationRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, correlationId, Guid.NewGuid().ToString());
815815

816816
_logger.TaskTimedOut(taskExec.TaskId, workflowInstance.Id, taskExec.Timeout);
817-
await _messageBrokerPublisherService.Publish(TaskTimeoutRoutingKey, jsonMesssage.ToMessage());
817+
await _messageBrokerPublisherService.Publish(ClinicalReviewTimeoutRoutingKey, jsonMesssage.ToMessage());
818818
return true;
819819
}
820820

tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public async Task MonaiBackgroundService_DoWork_ShouldPublishMessages()
8383

8484
// Verify DoWork publishes TaskCancellationRequest
8585
_pubService.Verify(p => p.Publish(It.Is<string>(m => m == _options.Value.Messaging.Topics.TaskCancellationRequest), It.IsAny<Message>()), Times.Once());
86+
8687
// Verify DoWork publishes TaskUpdateRequest
8788
_pubService.Verify(p => p.Publish(It.Is<string>(m => m == _options.Value.Messaging.Topics.TaskUpdateRequest), It.IsAny<Message>()), Times.Once());
8889

0 commit comments

Comments
 (0)