Skip to content

Commit c97df30

Browse files
authored
Merge pull request #778 from Project-MONAI/samrooke/AC-2008-workflow-manager-delete-payload
Samrooke/ac 2008 workflow manager delete payload
2 parents 51f8ede + 77230c2 commit c97df30

File tree

12 files changed

+348
-22
lines changed

12 files changed

+348
-22
lines changed

src/WorkflowManager/Common/Interfaces/IPayloadService.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ Task<IList<Payload>> GetAllAsync(int? skip = null,
4141
string? patientId = "",
4242
string? patientName = "");
4343

44+
/// <summary>
45+
/// Deletes a payload by id.
46+
/// </summary>
47+
/// <param name="payloadId">payload id to delete.</param>
48+
Task<bool> DeletePayloadFromStorageAsync(string payloadId);
49+
4450
/// <summary>
4551
/// Updates a payload
4652
/// </summary>

src/WorkflowManager/Common/Services/PayloadService.cs

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616

1717
using Ardalis.GuardClauses;
18+
using Microsoft.Extensions.DependencyInjection;
1819
using Microsoft.Extensions.Logging;
1920
using Monai.Deploy.Messaging.Events;
21+
using Monai.Deploy.Storage.API;
22+
using Monai.Deploy.WorkflowManager.Common.Exceptions;
2023
using Monai.Deploy.WorkflowManager.Common.Interfaces;
2124
using Monai.Deploy.WorkflowManager.Contracts.Models;
2225
using Monai.Deploy.WorkflowManager.Database.Interfaces;
@@ -27,17 +30,28 @@ namespace Monai.Deploy.WorkflowManager.Common.Services
2730
{
2831
public class PayloadService : IPayloadService
2932
{
30-
private readonly IPayloadRepsitory _payloadRepsitory;
33+
private readonly IPayloadRepsitory _payloadRepository;
3134

3235
private readonly IDicomService _dicomService;
3336

37+
private readonly IStorageService _storageService;
38+
3439
private readonly ILogger<PayloadService> _logger;
3540

36-
public PayloadService(IPayloadRepsitory payloadRepsitory, IDicomService dicomService, ILogger<PayloadService> logger)
41+
public PayloadService(
42+
IPayloadRepsitory payloadRepsitory,
43+
IDicomService dicomService,
44+
IServiceScopeFactory serviceScopeFactory,
45+
ILogger<PayloadService> logger)
3746
{
38-
_payloadRepsitory = payloadRepsitory ?? throw new ArgumentNullException(nameof(payloadRepsitory));
47+
_payloadRepository = payloadRepsitory ?? throw new ArgumentNullException(nameof(payloadRepsitory));
3948
_dicomService = dicomService ?? throw new ArgumentNullException(nameof(dicomService));
4049
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
50+
51+
var _serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
52+
var scope = _serviceScopeFactory.CreateScope();
53+
54+
_storageService = scope.ServiceProvider.GetService<IStorageService>() ?? throw new ArgumentNullException(nameof(IStorageService));
4155
}
4256

4357
public async Task<Payload?> CreateAsync(WorkflowRequestEvent eventPayload)
@@ -70,7 +84,7 @@ public PayloadService(IPayloadRepsitory payloadRepsitory, IDicomService dicomSer
7084
PatientDetails = patientDetails
7185
};
7286

73-
if (await _payloadRepsitory.CreateAsync(payload))
87+
if (await _payloadRepository.CreateAsync(payload))
7488
{
7589
_logger.PayloadCreated(payload.Id);
7690
return payload;
@@ -92,23 +106,67 @@ public async Task<Payload> GetByIdAsync(string payloadId)
92106
{
93107
Guard.Against.NullOrWhiteSpace(payloadId);
94108

95-
return await _payloadRepsitory.GetByIdAsync(payloadId);
109+
return await _payloadRepository.GetByIdAsync(payloadId);
96110
}
97111

98112
public async Task<IList<Payload>> GetAllAsync(int? skip = null,
99113
int? limit = null,
100114
string? patientId = "",
101115
string? patientName = "")
102-
=> await _payloadRepsitory.GetAllAsync(skip, limit, patientId, patientName);
116+
=> await _payloadRepository.GetAllAsync(skip, limit, patientId, patientName);
103117

104118
public async Task<IList<Payload>> GetAllAsync(int? skip = null, int? limit = null)
105-
=> await _payloadRepsitory.GetAllAsync(skip, limit);
119+
=> await _payloadRepository.GetAllAsync(skip, limit);
120+
121+
public async Task<long> CountAsync() => await _payloadRepository.CountAsync();
122+
123+
public async Task<bool> DeletePayloadFromStorageAsync(string payloadId)
124+
{
125+
Guard.Against.NullOrWhiteSpace(payloadId);
126+
127+
var payload = await GetByIdAsync(payloadId);
128+
129+
if (payload is null)
130+
{
131+
throw new MonaiNotFoundException($"Payload with ID: {payloadId} not found");
132+
}
133+
134+
if (payload.PayloadDeleted == PayloadDeleted.InProgress)
135+
{
136+
throw new MonaiBadRequestException($"Deletion of files for payload ID: {payloadId} already in progress");
137+
}
138+
139+
// update the payload to in progress before we request deletion form MinIO
140+
payload.PayloadDeleted = PayloadDeleted.InProgress;
141+
await _payloadRepository.UpdateAsync(payload);
142+
143+
// run deletion in alternative thread so the user isn't held up
144+
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
145+
Task.Run(async () =>
146+
{
147+
try
148+
{
149+
await _storageService.RemoveObjectsAsync(payload.Bucket, payload.Files.Select(f => f.Path));
150+
payload.PayloadDeleted = PayloadDeleted.Yes;
151+
}
152+
catch
153+
{
154+
_logger.PayloadUpdateFailed(payloadId);
155+
payload.PayloadDeleted = PayloadDeleted.Failed;
156+
}
157+
finally
158+
{
159+
await _payloadRepository.UpdateAsync(payload);
160+
}
161+
});
162+
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
106163

107-
public async Task<long> CountAsync() => await _payloadRepsitory.CountAsync();
164+
return true;
165+
}
108166

109167
public async Task<bool> UpdateWorkflowInstanceIdsAsync(string payloadId, IEnumerable<string> workflowInstances)
110168
{
111-
if (await _payloadRepsitory.UpdateAssociatedWorkflowInstancesAsync(payloadId, workflowInstances))
169+
if (await _payloadRepository.UpdateAssociatedWorkflowInstancesAsync(payloadId, workflowInstances))
112170
{
113171
_logger.PayloadUpdated(payloadId);
114172
return true;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//
2+
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
using System;
17+
using Monai.Deploy.WorkflowManager.Contracts.Models;
18+
using Mongo.Migration.Migrations.Document;
19+
using MongoDB.Bson;
20+
21+
namespace Monai.Deploy.WorkflowManager.Contracts.Migrations
22+
{
23+
public class M002_Payload_addPayloadDeleted : DocumentMigration<Payload>
24+
{
25+
public M002_Payload_addPayloadDeleted() : base("1.0.1") { }
26+
27+
public override void Up(BsonDocument document)
28+
{
29+
document.Add("PayloadDeleted", PayloadDeleted.No);
30+
}
31+
32+
public override void Down(BsonDocument document)
33+
{
34+
try
35+
{
36+
document.Remove("PayloadDeleted");
37+
}
38+
catch (Exception)
39+
{
40+
}
41+
}
42+
}
43+
}

src/WorkflowManager/Contracts/Models/Payload.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626

2727
namespace Monai.Deploy.WorkflowManager.Contracts.Models
2828
{
29-
[CollectionLocation("Payloads"), RuntimeVersion("1.0.0")]
29+
[CollectionLocation("Payloads"), RuntimeVersion("1.0.1")]
3030
public class Payload : IDocument
3131
{
3232
[JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))]
33-
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 0);
33+
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 1);
3434

3535
[JsonProperty(PropertyName = "id")]
3636
public string Id { get; set; } = string.Empty;
@@ -62,10 +62,21 @@ public class Payload : IDocument
6262
[JsonProperty(PropertyName = "timestamp")]
6363
public DateTime Timestamp { get; set; }
6464

65+
[JsonProperty(PropertyName = "payload_deleted")]
66+
public PayloadDeleted PayloadDeleted { get; set; } = PayloadDeleted.No;
67+
6568
[JsonProperty(PropertyName = "files")]
6669
public IList<BlockStorageInfo> Files { get; set; } = new List<BlockStorageInfo>();
6770

6871
[JsonProperty(PropertyName = "patient_details")]
6972
public PatientDetails PatientDetails { get; set; } = new PatientDetails();
7073
}
74+
75+
public enum PayloadDeleted
76+
{
77+
No,
78+
Yes,
79+
InProgress,
80+
Failed
81+
}
7182
}

src/WorkflowManager/Database/Interfaces/IPayloadRepsitory.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ public interface IPayloadRepsitory
4545
/// <returns>Count of objects.</returns>
4646
Task<long> CountAsync();
4747

48+
/// <summary>
49+
/// Updates a payload in the database.
50+
/// </summary>
51+
/// <param name="payload">The payload to update.</param>
52+
/// <returns>The updated payload.</returns>
53+
Task<bool> UpdateAsync(Payload payload);
54+
4855
/// Updates a payload in the database.
4956
/// </summary>
5057
/// <param name="payloadId"></param>

src/WorkflowManager/Database/Repositories/PayloadRepository.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,25 @@ public async Task<Payload> GetByIdAsync(string payloadId)
100100
return payload;
101101
}
102102

103+
public async Task<bool> UpdateAsync(Payload payload)
104+
{
105+
Guard.Against.Null(payload, nameof(payload));
106+
107+
try
108+
{
109+
var filter = Builders<Payload>.Filter.Eq(p => p.PayloadId, payload.PayloadId);
110+
var update = Builders<Payload>.Update.Set(p => p, payload);
111+
await _payloadCollection.UpdateOneAsync(filter, update);
112+
113+
return true;
114+
}
115+
catch (Exception ex)
116+
{
117+
_logger.DbUpdatePayloadError(payload.PayloadId, ex);
118+
return false;
119+
}
120+
}
121+
103122
public async Task<bool> UpdateAssociatedWorkflowInstancesAsync(string payloadId, IEnumerable<string> workflowInstances)
104123
{
105124
Guard.Against.NullOrEmpty(workflowInstances);

src/WorkflowManager/Logging/Log.800000.Database.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,8 @@ public static partial class Log
6060

6161
[LoggerMessage(EventId = 800013, Level = LogLevel.Error, Message = "Database call failed in {methodName}.")]
6262
public static partial void DatabaseException(this ILogger logger, string methodName, Exception ex);
63+
64+
[LoggerMessage(EventId = 800014, Level = LogLevel.Error, Message = "Failed to update payload: '{payloadId}'.")]
65+
public static partial void DbUpdatePayloadError(this ILogger logger, string payloadId, Exception ex);
6366
}
6467
}

src/WorkflowManager/WorkflowManager/Controllers/PayloadsController.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using Microsoft.AspNetCore.Mvc;
2323
using Microsoft.Extensions.Logging;
2424
using Microsoft.Extensions.Options;
25+
using Monai.Deploy.WorkflowManager.Common.Exceptions;
2526
using Monai.Deploy.WorkflowManager.Common.Interfaces;
2627
using Monai.Deploy.WorkflowManager.Configuration;
2728
using Monai.Deploy.WorkflowManager.Contracts.Models;
@@ -140,5 +141,46 @@ public async Task<IActionResult> GetAsync([FromRoute] string id)
140141
return Problem($"Unexpected error occurred: {e.Message}", $"/payload/{nameof(id)}", InternalServerError);
141142
}
142143
}
144+
145+
/// <summary>
146+
/// Delete a payload by ID.
147+
/// </summary>
148+
/// <param name="id">The payload id.</param>
149+
/// <returns>Boolean status of the success of the delete request.</returns>
150+
[Route("{id}")]
151+
[HttpDelete]
152+
[ProducesResponseType(typeof(bool), StatusCodes.Status202Accepted)]
153+
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status404NotFound)]
154+
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status404NotFound)]
155+
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status500InternalServerError)]
156+
public async Task<IActionResult> DeleteAsync([FromRoute] string id)
157+
{
158+
if (string.IsNullOrWhiteSpace(id) || !Guid.TryParse(id, out _))
159+
{
160+
_logger.LogDebug($"{nameof(DeleteAsync)} - Failed to validate {nameof(id)}");
161+
162+
return Problem($"Failed to validate {nameof(id)}, not a valid guid", $"/payload/{id}", BadRequest);
163+
}
164+
165+
try
166+
{
167+
return Accepted(await _payloadService.DeletePayloadFromStorageAsync(id));
168+
}
169+
catch (MonaiBadRequestException e)
170+
{
171+
_logger.PayloadGetAsyncError(id, e);
172+
return Problem(e.Message, $"/payload/{nameof(id)}", BadRequest);
173+
}
174+
catch (MonaiNotFoundException e)
175+
{
176+
_logger.PayloadGetAsyncError(id, e);
177+
return Problem(e.Message, $"/payload/{nameof(id)}", NotFound);
178+
}
179+
catch (Exception e)
180+
{
181+
_logger.PayloadGetAsyncError(id, e);
182+
return Problem($"Unexpected error occurred: {e.Message}", $"/payload/{nameof(id)}", InternalServerError);
183+
}
184+
}
143185
}
144186
}

src/WorkflowManager/WorkflowManager/Program.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,6 @@ private static void ConfigureServices(HostBuilderContext hostContext, IServiceCo
153153
services.AddMonaiDeployMessageBrokerPublisherService(hostContext.Configuration.GetSection("WorkflowManager:messaging:publisherServiceAssemblyName").Value);
154154
services.AddMonaiDeployMessageBrokerSubscriberService(hostContext.Configuration.GetSection("WorkflowManager:messaging:subscriberServiceAssemblyName").Value);
155155

156-
services.AddHostedService(p => p.GetService<DataRetentionService>());
157-
158156
services.AddWorkflowExecutor(hostContext);
159157

160158
services.AddHttpContextAccessor();

tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/WorkflowExecutorStartup.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,7 @@ private static IHostBuilder CreateHostBuilder() =>
9595

9696
services.AddSingleton<DataRetentionService>();
9797

98-
#pragma warning disable CS8603 // Possible null reference return.
99-
services.AddHostedService<DataRetentionService>(p => p.GetService<DataRetentionService>());
100-
#pragma warning restore CS8603 // Possible null reference return.
98+
services.AddHostedService(p => p.GetService<DataRetentionService>());
10199

102100
// Services
103101
services.AddTransient<IFileSystem, FileSystem>();
@@ -107,16 +105,16 @@ private static IHostBuilder CreateHostBuilder() =>
107105
services.Configure<WorkloadManagerDatabaseSettings>(hostContext.Configuration.GetSection("WorkloadManagerDatabase"));
108106
services.Configure<ExecutionStatsDatabaseSettings>(hostContext.Configuration.GetSection("WorkloadManagerDatabase"));
109107
services.AddSingleton<IMongoClient, MongoClient>(s => new MongoClient(hostContext.Configuration["WorkloadManagerDatabase:ConnectionString"]));
110-
services.AddMigration(new MongoMigrationSettings
111-
{
112-
ConnectionString = hostContext.Configuration.GetSection("WorkloadManagerDatabase:ConnectionString").Value,
113-
Database = hostContext.Configuration.GetSection("WorkloadManagerDatabase:DatabaseName").Value,
114-
});
115108
services.AddTransient<IWorkflowRepository, WorkflowRepository>();
116109
services.AddTransient<IWorkflowInstanceRepository, WorkflowInstanceRepository>();
117110
services.AddTransient<IPayloadRepsitory, PayloadRepository>();
118111
services.AddTransient<ITasksRepository, TasksRepository>();
119112
services.AddTransient<ITaskExecutionStatsRepository, TaskExecutionStatsRepository>();
113+
services.AddMigration(new MongoMigrationSettings
114+
{
115+
ConnectionString = hostContext.Configuration.GetSection("WorkloadManagerDatabase:ConnectionString").Value,
116+
Database = hostContext.Configuration.GetSection("WorkloadManagerDatabase:DatabaseName").Value,
117+
});
120118

121119
// StorageService - Since mc.exe is unavailable during e2e, skip admin check
122120
services.AddMonaiDeployStorageService(hostContext.Configuration.GetSection("WorkflowManager:storage:serviceAssemblyName").Value, HealthCheckOptions.ServiceHealthCheck);

0 commit comments

Comments
 (0)