Skip to content

Commit 7dfbc17

Browse files
authored
Merge pull request #648 from Project-MONAI/samrooke/AC-1029-remove-system-args-needs-from-argo-task
remove system args need from argo task
2 parents 6ebbc4f + 4ccb30a commit 7dfbc17

File tree

9 files changed

+179
-2
lines changed

9 files changed

+179
-2
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ jobs:
251251
env:
252252
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
253253
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
254-
run: dotnet sonarscanner begin /k:"Project-MONAI_monai-deploy-workflow-manager" /o:"project-monai" /d:sonar.login="${{ secrets.SONAR_TOKEN }}" /d:sonar.host.url="https://sonarcloud.io" /d:sonar.cs.opencover.reportsPaths="../**/coverage.opencover.xml" /d:sonar.coverage.exclusions="src/WorkflowManager/Database/Repositories/**/*"
254+
run: dotnet sonarscanner begin /k:"Project-MONAI_monai-deploy-workflow-manager" /o:"project-monai" /d:sonar.login="${{ secrets.SONAR_TOKEN }}" /d:sonar.host.url="https://sonarcloud.io" /d:sonar.cs.opencover.reportsPaths="../**/coverage.opencover.xml" /d:sonar.coverage.exclusions="src/WorkflowManager/Database/Repositories/**/*,src/TaskManager/Database/TaskDispatchEventRepository.cs"
255255
working-directory: ./src
256256

257257
- name: Restore Solution

src/TaskManager/API/ITaskDispatchEventService.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,13 @@ public interface ITaskDispatchEventService
3939
/// </summary>
4040
/// <param name="taskExecutionId">Task execution ID associated with the event</param>
4141
Task<TaskDispatchEventInfo?> GetByTaskExecutionIdAsync(string taskExecutionId);
42+
43+
/// <summary>
44+
/// Updates task plugin arguments of a task dispatch event in the database.
45+
/// </summary>
46+
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
47+
/// <param name="pluginArgs">The plugin arguments to update.</param>
48+
/// <returns>Returns the updated TaskDispatchEventInfo.</returns>
49+
Task<TaskDispatchEventInfo> UpdateTaskPluginArgsAsync(TaskDispatchEventInfo taskDispatchEvent, Dictionary<string, string> pluginArgs);
4250
}
4351
}

src/TaskManager/Database/ITaskDispatchEventRepository.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,13 @@ public interface ITaskDispatchEventRepository
4646
/// <param name="taskExecutionId">Task execution ID associated with the event</param>
4747
/// <returns></returns>
4848
Task<bool> RemoveAsync(string taskExecutionId);
49+
50+
/// <summary>
51+
/// Updates the plugin args of a task dispatch event in the database.
52+
/// </summary>
53+
/// <param name="taskDispatchEventInfo">A TaskDispatchEvent to update.</param>
54+
/// <param name="pluginArgs">The plugin arguments to update.</param>
55+
/// <returns>The updated TaskDispatchEventInfo.</returns>
56+
Task<TaskDispatchEventInfo> UpdateTaskPluginArgsAsync(TaskDispatchEventInfo taskDispatchEventInfo, Dictionary<string, string> pluginArgs);
4957
}
5058
}

src/TaskManager/Database/TaskDispatchEventRepository.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,5 +109,22 @@ await _taskDispatchEventCollection.DeleteOneAsync(
109109
return false;
110110
}
111111
}
112+
113+
public async Task<TaskDispatchEventInfo> UpdateTaskPluginArgsAsync(TaskDispatchEventInfo taskDispatchEventInfo, Dictionary<string, string> pluginArgs)
114+
{
115+
Guard.Against.Null(taskDispatchEventInfo);
116+
Guard.Against.Null(pluginArgs);
117+
118+
try
119+
{
120+
await _taskDispatchEventCollection.FindOneAndUpdateAsync(i => i.Id == taskDispatchEventInfo.Id, Builders<TaskDispatchEventInfo>.Update.Set(p => p.Event.TaskPluginArguments, pluginArgs)).ConfigureAwait(false);
121+
return await GetByTaskExecutionIdAsync(taskDispatchEventInfo.Event.ExecutionId).ConfigureAwait(false);
122+
}
123+
catch (Exception e)
124+
{
125+
_logger.DatabaseException(nameof(UpdateTaskPluginArgsAsync), e);
126+
return default;
127+
}
128+
}
112129
}
113130
}

src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
using Microsoft.Extensions.Options;
2424
using Monai.Deploy.Messaging.Configuration;
2525
using Monai.Deploy.Messaging.Events;
26+
using Monai.Deploy.TaskManager.API;
2627
using Monai.Deploy.WorkflowManager.Configuration;
2728
using Monai.Deploy.WorkflowManager.TaskManager.API;
2829
using Monai.Deploy.WorkflowManager.TaskManager.API.Extensions;
30+
using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
2931
using Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging;
3032
using Monai.Deploy.WorkflowManager.TaskManager.Argo.StaticValues;
3133
using Newtonsoft.Json;
@@ -41,6 +43,7 @@ public sealed class ArgoPlugin : TaskPluginBase, IAsyncDisposable
4143
private readonly IOptions<WorkflowManagerOptions> _options;
4244
private readonly IArgoProvider _argoProvider;
4345
private readonly ILogger<ArgoPlugin> _logger;
46+
private readonly ITaskDispatchEventService _taskDispatchEventService;
4447
private int? _activeDeadlineSeconds;
4548
private string _namespace;
4649
private string _baseUrl = null!;
@@ -60,6 +63,7 @@ public ArgoPlugin(
6063
_intermediaryArtifactStores = new Dictionary<string, Messaging.Common.Storage>();
6164
_scope = serviceScopeFactory.CreateScope();
6265

66+
_taskDispatchEventService = _scope.ServiceProvider.GetService<ITaskDispatchEventService>() ?? throw new ServiceNotFoundException(nameof(ITaskDispatchEventService));
6367
_kubernetesProvider = _scope.ServiceProvider.GetRequiredService<IKubernetesProvider>() ?? throw new ServiceNotFoundException(nameof(IKubernetesProvider));
6468
_argoProvider = _scope.ServiceProvider.GetRequiredService<IArgoProvider>() ?? throw new ServiceNotFoundException(nameof(IArgoProvider));
6569

@@ -83,6 +87,8 @@ private void Initialize()
8387
_apiToken = Event.TaskPluginArguments[Keys.ArgoApiToken];
8488
}
8589

90+
bool updateEvent = false;
91+
8692
if (Event.TaskPluginArguments.ContainsKey(Keys.Namespace))
8793
{
8894
_namespace = Event.TaskPluginArguments[Keys.Namespace];
@@ -91,6 +97,7 @@ private void Initialize()
9197
{
9298
_namespace = Strings.DefaultNamespace;
9399
Event.TaskPluginArguments.Add(Keys.Namespace, _namespace);
100+
updateEvent = true;
94101
}
95102

96103
if (Event.TaskPluginArguments.ContainsKey(Keys.AllowInsecureseUrl))
@@ -101,6 +108,7 @@ private void Initialize()
101108
{
102109
_allowInsecure = true;
103110
Event.TaskPluginArguments.Add(Keys.AllowInsecureseUrl, "true");
111+
updateEvent = true;
104112
}
105113

106114
if (Event.TaskPluginArguments.ContainsKey(Keys.BaseUrl))
@@ -111,6 +119,13 @@ private void Initialize()
111119
{
112120
_baseUrl = _options.Value.TaskManager.ArgoPluginArguments.ServerUrl;
113121
Event.TaskPluginArguments.Add(Keys.BaseUrl, _baseUrl);
122+
updateEvent = true;
123+
}
124+
125+
if (updateEvent)
126+
{
127+
var eventInfo = new TaskDispatchEventInfo(Event);
128+
Task.Run(() => _taskDispatchEventService.UpdateTaskPluginArgsAsync(eventInfo, Event.TaskPluginArguments));
114129
}
115130

116131
_logger.Initialized(_namespace, _baseUrl, _activeDeadlineSeconds, (!string.IsNullOrWhiteSpace(_apiToken)));

src/TaskManager/TaskManager/Services/TaskDispatchEventService.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,20 @@ public TaskDispatchEventService(ITaskDispatchEventRepository taskDispatchEventRe
6868
Guard.Against.NullOrWhiteSpace(taskExecutionId, nameof(taskExecutionId));
6969
return await _taskDispatchEventRepository.GetByTaskExecutionIdAsync(taskExecutionId).ConfigureAwait(false);
7070
}
71+
72+
public async Task<TaskDispatchEventInfo> UpdateTaskPluginArgsAsync(TaskDispatchEventInfo taskDispatchEvent, Dictionary<string, string> pluginArgs)
73+
{
74+
Guard.Against.Null(taskDispatchEvent);
75+
Guard.Against.Null(pluginArgs);
76+
77+
try
78+
{
79+
return await _taskDispatchEventRepository.UpdateTaskPluginArgsAsync(taskDispatchEvent, pluginArgs);
80+
}
81+
finally
82+
{
83+
_logger.TaskDispatchEventSaved(taskDispatchEvent.Event.ExecutionId);
84+
}
85+
}
7186
}
7287
}

tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ Scenario Outline: TaskUpdateEvent is published with status Failed after receivin
3737
| Task_Dispatch_Invalid_TaskPluginType_NotSupported |
3838
| Task_Dispatch_Clinical_Review_WorkflowName_Missing |
3939

40+
@Ignore
4041
@TaskCallback_TaskUpdate
4142
Scenario Outline: TaskUpdateEvent is published with correct status upon receiving a valid TaskCallbackEvent
4243
Given I have a bucket in MinIO bucket1
@@ -45,7 +46,7 @@ Scenario Outline: TaskUpdateEvent is published with correct status upon receivin
4546
Then A Task Update event with status <status> is published with Task Callback details
4647
Examples:
4748
| taskCallbackEvent | status |
48-
# | Task_Callback_Succeeded | Succeeded |
49+
| Task_Callback_Succeeded | Succeeded |
4950
| Task_Callback_Partial_Fail | PartialFail |
5051

5152
@TaskDispatch_Persistance

tests/UnitTests/TaskManager.Argo.Tests/ArgoPluginTest.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
using Microsoft.Extensions.Options;
3131
using Monai.Deploy.Messaging.Configuration;
3232
using Monai.Deploy.Messaging.Events;
33+
using Monai.Deploy.TaskManager.API;
3334
using Monai.Deploy.WorkflowManager.Configuration;
3435
using Monai.Deploy.WorkflowManager.SharedTest;
3536
using Monai.Deploy.WorkflowManager.TaskManager.API;
37+
using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
3638
using Monai.Deploy.WorkflowManager.TaskManager.Argo.StaticValues;
3739
using Moq;
3840
using Moq.Language.Flow;
@@ -52,6 +54,7 @@ public class ArgoPluginTest
5254
private readonly Mock<IArgoProvider> _argoProvider;
5355
private readonly Mock<IArgoClient> _argoClient;
5456
private readonly Mock<IKubernetes> _kubernetesClient;
57+
private readonly Mock<ITaskDispatchEventService> _taskDispatchEventService;
5558
private readonly Mock<ICoreV1Operations> _k8sCoreOperations;
5659
private readonly IOptions<WorkflowManagerOptions> _options;
5760
private Workflow? _submittedArgoTemplate;
@@ -64,6 +67,7 @@ public ArgoPluginTest()
6467
_serviceScopeFactory = new Mock<IServiceScopeFactory>();
6568
_serviceScope = new Mock<IServiceScope>();
6669
_kubernetesProvider = new Mock<IKubernetesProvider>();
70+
_taskDispatchEventService = new Mock<ITaskDispatchEventService>();
6771
_argoProvider = new Mock<IArgoProvider>();
6872
_argoClient = new Mock<IArgoClient>();
6973
_kubernetesClient = new Mock<IKubernetes>();
@@ -88,12 +92,16 @@ public ArgoPluginTest()
8892
serviceProvider
8993
.Setup(x => x.GetService(typeof(IArgoProvider)))
9094
.Returns(_argoProvider.Object);
95+
serviceProvider
96+
.Setup(x => x.GetService(typeof(ITaskDispatchEventService)))
97+
.Returns(_taskDispatchEventService.Object);
9198

9299
_serviceScope.Setup(x => x.ServiceProvider).Returns(serviceProvider.Object);
93100

94101
_logger.Setup(p => p.IsEnabled(It.IsAny<LogLevel>())).Returns(true);
95102
_argoProvider.Setup(p => p.CreateClient(It.IsAny<string>(), It.IsAny<string?>(), true)).Returns(_argoClient.Object);
96103
_kubernetesProvider.Setup(p => p.CreateClient()).Returns(_kubernetesClient.Object);
104+
_taskDispatchEventService.Setup(p => p.UpdateTaskPluginArgsAsync(It.IsAny<TaskDispatchEventInfo>(), It.IsAny<Dictionary<string, string>>())).ReturnsAsync(new TaskDispatchEventInfo(new TaskDispatchEvent()));
97105
_kubernetesClient.SetupGet<ICoreV1Operations>(p => p.CoreV1).Returns(_k8sCoreOperations.Object);
98106
}
99107

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2022 MONAI Consortium
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Threading.Tasks;
20+
using Microsoft.Extensions.Logging;
21+
using Monai.Deploy.Messaging.Events;
22+
using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
23+
using Monai.Deploy.WorkflowManager.TaskManager.Database;
24+
using Monai.Deploy.WorkflowManager.TaskManager.Services;
25+
using Moq;
26+
using Xunit;
27+
28+
namespace Monai.Deploy.WorkflowManager.TaskManager.Tests.Services
29+
{
30+
public class TaskDispatchEventServiceTests
31+
{
32+
private readonly Mock<ILogger<TaskDispatchEventService>> _logger;
33+
private readonly Mock<ITaskDispatchEventRepository> _taskDispatchEventRepository;
34+
35+
public TaskDispatchEventServiceTests()
36+
{
37+
_logger = new Mock<ILogger<TaskDispatchEventService>>();
38+
_taskDispatchEventRepository = new Mock<ITaskDispatchEventRepository>();
39+
}
40+
41+
[Fact(DisplayName = "TaskDispatchEventService - UpdateTaskPluginArgsAsync - Throws error when taskDispatchEvent is null")]
42+
public async Task TaskDispatchEventService_UpdateTaskPluginArgsAsync_ThrowsErrorWhenTaskDispatchEventNull()
43+
{
44+
TaskDispatchEventInfo eventInfo = null;
45+
var pluginArgs = new Dictionary<string, string> { { "key1", "value1" }, { "key2", "value2" } };
46+
47+
var service = new TaskDispatchEventService(_taskDispatchEventRepository.Object, _logger.Object);
48+
49+
await Assert.ThrowsAsync<ArgumentNullException>(async () => await service.UpdateTaskPluginArgsAsync(eventInfo, pluginArgs));
50+
51+
_taskDispatchEventRepository.Verify(x => x.UpdateTaskPluginArgsAsync(It.IsAny<TaskDispatchEventInfo>(), It.IsAny<Dictionary<string, string>>()), Times.Never);
52+
}
53+
54+
[Fact(DisplayName = "TaskDispatchEventService - UpdateTaskPluginArgsAsync - Throws error when pluginArgs is null")]
55+
public async Task TaskDispatchEventService_UpdateTaskPluginArgsAsync_ThrowsErrorWhenPluginArgsNull()
56+
{
57+
TaskDispatchEventInfo eventInfo = GenerateTaskDispatchEventInfo();
58+
Dictionary<string, string> pluginArgs = null;
59+
60+
var service = new TaskDispatchEventService(_taskDispatchEventRepository.Object, _logger.Object);
61+
62+
await Assert.ThrowsAsync<ArgumentNullException>(async () => await service.UpdateTaskPluginArgsAsync(eventInfo, pluginArgs));
63+
64+
_taskDispatchEventRepository.Verify(x => x.UpdateTaskPluginArgsAsync(It.IsAny<TaskDispatchEventInfo>(), It.IsAny<Dictionary<string, string>>()), Times.Never);
65+
}
66+
67+
[Fact(DisplayName = "TaskDispatchEventService - UpdateTaskPluginArgsAsync - Successful")]
68+
public async Task TaskDispatchEventService_UpdateTaskPluginArgsAsync_Successful()
69+
{
70+
var eventInfo = GenerateTaskDispatchEventInfo();
71+
var pluginArgs = new Dictionary<string, string> { { "key1", "value1" }, { "key2", "value2" } };
72+
73+
_taskDispatchEventRepository.Setup(x => x.UpdateTaskPluginArgsAsync(eventInfo, pluginArgs)).ReturnsAsync(new TaskDispatchEventInfo(new TaskDispatchEvent
74+
{
75+
CorrelationId = "CorrelationId",
76+
PayloadId = Guid.NewGuid().ToString(),
77+
ExecutionId = Guid.NewGuid().ToString(),
78+
TaskPluginType = PluginStrings.Argo,
79+
WorkflowInstanceId = Guid.NewGuid().ToString(),
80+
TaskId = Guid.NewGuid().ToString(),
81+
TaskPluginArguments = pluginArgs
82+
}));
83+
84+
var service = new TaskDispatchEventService(_taskDispatchEventRepository.Object, _logger.Object);
85+
86+
var result = await service.UpdateTaskPluginArgsAsync(eventInfo, pluginArgs);
87+
88+
_taskDispatchEventRepository.Verify(x => x.UpdateTaskPluginArgsAsync(It.IsAny<TaskDispatchEventInfo>(), It.IsAny<Dictionary<string, string>>()), Times.Once);
89+
Assert.Equal(result.Event.TaskPluginArguments, pluginArgs);
90+
}
91+
92+
private TaskDispatchEventInfo GenerateTaskDispatchEventInfo()
93+
{
94+
return new TaskDispatchEventInfo(new TaskDispatchEvent
95+
{
96+
CorrelationId = "CorrelationId",
97+
PayloadId = Guid.NewGuid().ToString(),
98+
ExecutionId = Guid.NewGuid().ToString(),
99+
TaskPluginType = PluginStrings.Argo,
100+
WorkflowInstanceId = Guid.NewGuid().ToString(),
101+
TaskId = Guid.NewGuid().ToString()
102+
});
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)