Skip to content

Commit 62a3dc2

Browse files
authored
Merge pull request #767 from Project-MONAI/AC-2127
adding stats update for timeout(canceled) tasks
2 parents 9afe61c + 0191191 commit 62a3dc2

19 files changed

+341
-120
lines changed

src/Shared/Shared/Wrappers/StatsPagedResponse.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class StatsPagedResponse<T> : PagedResponse<T>
2121
public DateTime PeriodEnd { get; set; }
2222
public long TotalExecutions { get; set; }
2323
public long TotalFailures { get; set; }
24+
public long TotalInprogress { get; set; }
2425
public double AverageTotalExecutionSeconds { get; set; }
2526
public double AverageArgoExecutionSeconds { get; set; }
2627

src/TaskManager/API/Models/TaskExecutionStats.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,5 +136,15 @@ public TaskExecutionStats(TaskUpdateEvent taskUpdateEvent)
136136
TaskId = taskUpdateEvent.TaskId;
137137
Status = taskUpdateEvent.Status.ToString();
138138
}
139+
140+
public TaskExecutionStats(TaskCancellationEvent taskCanceledEvent, string correlationId)
141+
{
142+
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");
143+
CorrelationId = correlationId;
144+
WorkflowInstanceId = taskCanceledEvent.WorkflowInstanceId;
145+
ExecutionId = taskCanceledEvent.ExecutionId;
146+
TaskId = taskCanceledEvent.TaskId;
147+
Status = TaskExecutionStatus.Failed.ToString();
148+
}
139149
}
140150
}

src/TaskManager/Database/ITaskExecutionStatsRepository.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,29 @@ public interface ITaskExecutionStatsRepository
2525
/// Creates a task dispatch event in the database.
2626
/// </summary>
2727
/// <param name="taskDispatchEvent">A TaskDispatchEvent to create.</param>
28-
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
28+
/// <returns></returns>
2929
Task CreateAsync(TaskDispatchEventInfo taskDispatchEventInfo);
3030

3131
/// <summary>
32-
/// Updates user accounts of a task dispatch event in the database.
32+
/// Updates status of a task dispatch event in the database.
3333
/// </summary>
3434
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
35-
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
35+
/// <returns></returns>
3636
Task UpdateExecutionStatsAsync(TaskUpdateEvent taskUpdateEvent);
3737

38+
/// <summary>
39+
/// Updates status of a task now its been canceled.
40+
/// </summary>
41+
/// <param name="TaskCanceledException">A TaskCanceledException to update.</param>
42+
/// <returns></returns
43+
Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId);
44+
3845
/// <summary>
3946
/// Returns paged entries between the two given dates.
4047
/// </summary>
4148
/// <param name="startTime">start of the range.</param>
4249
/// <param name="endTime">end of the range.</param>
43-
/// <returns>a paged view of entried in range</returns>
50+
/// <returns>a collections of stats</returns>
4451
Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "");
4552

4653
/// <summary>
@@ -49,14 +56,22 @@ public interface ITaskExecutionStatsRepository
4956
/// <param name="startTime">start of the range.</param>
5057
/// <param name="endTime">end of the range.</param>
5158
/// <returns>The count of all records in range</returns>
52-
Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
59+
//Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
5360

61+
/// <summary>
62+
/// Return the count of the entries with this status, or all if no status given
63+
/// </summary>
64+
/// <param name="start">start of the range.</param>
65+
/// <param name="endTime">end of the range.</param>
66+
/// <param name="status">the status to get count of, or string.empty</param>
67+
/// <returns>The count of all records in range</returns>
68+
Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "");
5469
/// <summary>
5570
/// Returns all stats in Failed or PartialFail status.
5671
/// </summary>
5772
/// <param name="startTime">start of the range.</param>
5873
/// <param name="endTime">end of the range.</param>
59-
/// <returns>All stats NOT of that status</returns>
74+
/// <returns>All stats that failed or partially failed</returns>
6075
Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
6176

6277
/// <summary>

src/TaskManager/Database/TaskExecutionStatsRepository.cs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,29 @@ await _taskExecutionStatsCollection.UpdateOneAsync(o =>
117117
}
118118
}
119119

120+
public async Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId)
121+
{
122+
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");
123+
124+
try
125+
{
126+
var updateMe = new TaskExecutionStats(taskCanceledEvent, correlationId);
127+
var duration = updateMe.CompletedAtUTC == default ? 0 : (updateMe.CompletedAtUTC - updateMe.StartedUTC).TotalMilliseconds / 1000;
128+
await _taskExecutionStatsCollection.UpdateOneAsync(o =>
129+
o.ExecutionId == updateMe.ExecutionId,
130+
Builders<TaskExecutionStats>.Update
131+
.Set(w => w.Status, updateMe.Status)
132+
.Set(w => w.LastUpdatedUTC, DateTime.UtcNow)
133+
.Set(w => w.CompletedAtUTC, updateMe.CompletedAtUTC)
134+
.Set(w => w.DurationSeconds, duration)
135+
136+
, new UpdateOptions { IsUpsert = true }).ConfigureAwait(false);
137+
}
138+
catch (Exception e)
139+
{
140+
_logger.DatabaseException(nameof(CreateAsync), e);
141+
}
142+
}
120143
public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "")
121144
{
122145
startTime = startTime.ToUniversalTime();
@@ -128,12 +151,13 @@ public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startT
128151
T.StartedUTC >= startTime &&
129152
T.StartedUTC <= endTime.ToUniversalTime() &&
130153
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
131-
(taskIdNull || T.TaskId == taskId) &&
132-
(
133-
T.Status == TaskExecutionStatus.Succeeded.ToString()
134-
|| T.Status == TaskExecutionStatus.Failed.ToString()
135-
|| T.Status == TaskExecutionStatus.PartialFail.ToString()
136-
)
154+
(taskIdNull || T.TaskId == taskId)
155+
//&&
156+
//(
157+
// T.Status == TaskExecutionStatus.Succeeded.ToString()
158+
// || T.Status == TaskExecutionStatus.Failed.ToString()
159+
// || T.Status == TaskExecutionStatus.PartialFail.ToString()
160+
// )
137161
)
138162
.Limit(PageSize)
139163
.Skip((PageNumber - 1) * PageSize)
@@ -173,24 +197,19 @@ private static TaskExecutionStats ExposeExecutionStats(TaskExecutionStats taskEx
173197
}
174198
return taskExecutionStats;
175199
}
176-
177-
public async Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "")
200+
public async Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "")
178201
{
202+
var statusNull = string.IsNullOrWhiteSpace(status);
179203
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);
180204
var taskIdNull = string.IsNullOrWhiteSpace(taskId);
181205

182206
return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
183-
T.StartedUTC >= startTime.ToUniversalTime() &&
207+
T.StartedUTC >= start.ToUniversalTime() &&
184208
T.StartedUTC <= endTime.ToUniversalTime() &&
185209
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
186210
(taskIdNull || T.TaskId == taskId) &&
187-
(
188-
T.Status == TaskExecutionStatus.Succeeded.ToString() ||
189-
T.Status == TaskExecutionStatus.Failed.ToString() ||
190-
T.Status == TaskExecutionStatus.PartialFail.ToString())
191-
);
211+
(statusNull || T.Status == status));
192212
}
193-
194213
public async Task<long> GetStatsStatusFailedCountAsync(DateTime start, DateTime endTime, string workflowInstanceId = "", string taskId = "")
195214
{
196215
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);

src/TaskManager/TaskManager/Controllers/TaskStatsController.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using Microsoft.AspNetCore.Mvc;
1919
using Microsoft.Extensions.Logging;
2020
using Microsoft.Extensions.Options;
21+
using Monai.Deploy.Messaging.Events;
2122
using Monai.Deploy.WorkflowManager.Configuration;
2223
using Monai.Deploy.WorkflowManager.ControllersShared;
2324
using Monai.Deploy.WorkflowManager.Shared.Filter;
@@ -78,16 +79,18 @@ public async Task<IActionResult> GetOverviewAsync([FromQuery] DateTime startTime
7879
try
7980
{
8081
var fails = _repository.GetStatsStatusFailedCountAsync(startTime, endTime);
81-
var rangeCount = _repository.GetStatsCountAsync(startTime, endTime);
82+
var running = _repository.GetStatsStatusCountAsync(startTime, endTime, TaskExecutionStatus.Accepted.ToString());
83+
var rangeCount = _repository.GetStatsStatusCountAsync(startTime, endTime);
8284
var stats = _repository.GetAverageStats(startTime, endTime);
8385

84-
await Task.WhenAll(fails, rangeCount, stats);
86+
await Task.WhenAll(fails, rangeCount, stats, running);
8587
return Ok(new
8688
{
8789
PeriodStart = startTime,
8890
PeriodEnd = endTime,
8991
TotalExecutions = (int)rangeCount.Result,
9092
TotalFailures = (int)fails.Result,
93+
TotalInprogress = running.Result,
9194
AverageTotalExecutionSeconds = Math.Round(stats.Result.avgTotalExecution, 2),
9295
AverageArgoExecutionSeconds = Math.Round(stats.Result.avgArgoExecution, 2),
9396
});
@@ -102,7 +105,7 @@ public async Task<IActionResult> GetOverviewAsync([FromQuery] DateTime startTime
102105
[ProducesResponseType(typeof(StatsPagedResponse<List<ExecutionStatDTO>>), StatusCodes.Status200OK)]
103106
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status500InternalServerError)]
104107
[HttpGet("stats")]
105-
public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, string workflowId, string taskId)
108+
public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, string? workflowId = "", string? taskId = "")
106109
{
107110

108111
if ((string.IsNullOrWhiteSpace(workflowId) && string.IsNullOrWhiteSpace(taskId)) is false
@@ -130,12 +133,13 @@ public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, st
130133

131134
try
132135
{
133-
var allStats = _repository.GetStatsAsync(filter.StartTime, filter.EndTime, pageSize, filter.PageNumber, workflowId, taskId);
134-
var fails = _repository.GetStatsStatusFailedCountAsync(filter.StartTime, filter.EndTime, workflowId, taskId);
135-
var rangeCount = _repository.GetStatsCountAsync(filter.StartTime, filter.EndTime, workflowId, taskId);
136-
var stats = _repository.GetAverageStats(filter.StartTime, filter.EndTime, workflowId, taskId);
136+
var allStats = _repository.GetStatsAsync(filter.StartTime, filter.EndTime, pageSize, filter.PageNumber, workflowId ?? string.Empty, taskId ?? string.Empty);
137+
var fails = _repository.GetStatsStatusFailedCountAsync(filter.StartTime, filter.EndTime, workflowId ?? string.Empty, taskId ?? string.Empty);
138+
var rangeCount = _repository.GetStatsStatusCountAsync(filter.StartTime, filter.EndTime, string.Empty, workflowId ?? string.Empty, taskId ?? string.Empty);
139+
var stats = _repository.GetAverageStats(filter.StartTime, filter.EndTime, workflowId ?? string.Empty, taskId ?? string.Empty);
140+
var running = _repository.GetStatsStatusCountAsync(filter.StartTime, filter.EndTime, TaskExecutionStatus.Accepted.ToString());
137141

138-
await Task.WhenAll(allStats, fails, rangeCount, stats);
142+
await Task.WhenAll(allStats, fails, rangeCount, stats, running);
139143

140144
ExecutionStatDTO[] statsDto;
141145

@@ -150,6 +154,7 @@ public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, st
150154
res.PeriodEnd = filter.EndTime;
151155
res.TotalExecutions = rangeCount.Result;
152156
res.TotalFailures = fails.Result;
157+
res.TotalInprogress = running.Result;
153158
res.AverageTotalExecutionSeconds = Math.Round(stats.Result.avgTotalExecution, 2);
154159
res.AverageArgoExecutionSeconds = Math.Round(stats.Result.avgArgoExecution, 2);
155160
return Ok(res);

src/TaskManager/TaskManager/TaskManager.cs

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,6 @@ public Task StartAsync(CancellationToken cancellationToken)
101101
return Task.CompletedTask;
102102
}
103103

104-
private void SubscribeToEvents()
105-
{
106-
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskDispatchRequest, _options.Value.Messaging.Topics.TaskDispatchRequest, TaskDispatchEventReceivedCallback);
107-
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCallbackRequest, _options.Value.Messaging.Topics.TaskCallbackRequest, TaskCallbackEventReceivedCallback);
108-
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCancellationRequest, _options.Value.Messaging.Topics.TaskCancellationRequest, TaskCancelationEventCallback);
109-
}
110-
111104
public Task StopAsync(CancellationToken cancellationToken)
112105
{
113106
_logger.ServiceStopping(ServiceName);
@@ -121,6 +114,39 @@ public Task StopAsync(CancellationToken cancellationToken)
121114
return Task.CompletedTask;
122115
}
123116

117+
private static JsonMessage<TaskUpdateEvent> GenerateUpdateEventMessage<T>(
118+
JsonMessage<T> message,
119+
string executionId,
120+
string workflowInstanceId,
121+
string taskId,
122+
ExecutionStatus executionStatus,
123+
List<Messaging.Common.Storage>? outputs = null)
124+
{
125+
Guard.Against.Null(message, nameof(message));
126+
Guard.Against.Null(executionStatus, nameof(executionStatus));
127+
128+
var body = new TaskUpdateEvent
129+
{
130+
CorrelationId = message.CorrelationId,
131+
ExecutionId = executionId,
132+
Reason = executionStatus.FailureReason,
133+
Status = executionStatus.Status,
134+
ExecutionStats = executionStatus.Stats,
135+
WorkflowInstanceId = workflowInstanceId,
136+
TaskId = taskId,
137+
Message = executionStatus.Errors,
138+
Outputs = outputs ?? new List<Messaging.Common.Storage>(),
139+
};
140+
return new JsonMessage<TaskUpdateEvent>(body, TaskManagerApplicationId, message.CorrelationId);
141+
}
142+
143+
private void SubscribeToEvents()
144+
{
145+
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskDispatchRequest, _options.Value.Messaging.Topics.TaskDispatchRequest, TaskDispatchEventReceivedCallback);
146+
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCallbackRequest, _options.Value.Messaging.Topics.TaskCallbackRequest, TaskCallbackEventReceivedCallback);
147+
_messageBrokerSubscriberService.SubscribeAsync(_options.Value.Messaging.Topics.TaskCancellationRequest, _options.Value.Messaging.Topics.TaskCancellationRequest, TaskCancelationEventCallback);
148+
}
149+
124150
private async Task TaskCallbackEventReceivedCallback(MessageReceivedEventArgs args)
125151
{
126152
await TaskCallBackGeneric<TaskCallbackEvent>(args, HandleTaskCallback);
@@ -200,8 +226,12 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
200226
{
201227
throw new InvalidOperationException("Task Event data not found.");
202228
}
229+
203230
var taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
204231
await taskRunner.HandleTimeout(message.Body.Identity);
232+
233+
await _taskExecutionStatsRepository.UpdateExecutionStatsAsync(message.Body, message.CorrelationId);
234+
AcknowledgeMessage(message);
205235
}
206236
catch (Exception ex)
207237
{
@@ -515,26 +545,6 @@ private void AcknowledgeMessage<T>(JsonMessage<T> message)
515545
}
516546
}
517547

518-
private static JsonMessage<TaskUpdateEvent> GenerateUpdateEventMessage<T>(JsonMessage<T> message, string executionId, string WorkflowInstanceId, string taskId, ExecutionStatus executionStatus, List<Messaging.Common.Storage> outputs = null)
519-
{
520-
Guard.Against.Null(message, nameof(message));
521-
Guard.Against.Null(executionStatus, nameof(executionStatus));
522-
523-
var body = new TaskUpdateEvent
524-
{
525-
CorrelationId = message.CorrelationId,
526-
ExecutionId = executionId,
527-
Reason = executionStatus.FailureReason,
528-
Status = executionStatus.Status,
529-
ExecutionStats = executionStatus.Stats,
530-
WorkflowInstanceId = WorkflowInstanceId,
531-
TaskId = taskId,
532-
Message = executionStatus.Errors,
533-
Outputs = outputs ?? new List<Messaging.Common.Storage>(),
534-
};
535-
return new JsonMessage<TaskUpdateEvent>(body, TaskManagerApplicationId, message.CorrelationId);
536-
}
537-
538548
//TODO: gh-100 implement retry logic
539549
private async Task SendUpdateEvent(JsonMessage<TaskUpdateEvent> message)
540550
{

src/TaskManager/TaskManager/appsettings.Local.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"WorkloadManagerDatabase": {
3-
"ConnectionString": "mongodb://root:rootpassword@localhost:30017",
3+
"ConnectionString": "mongodb://root:rootpassword@localhost:27017",
44
"DatabaseName": "WorkloadManager"
55
},
66
"WorkflowManager": {
@@ -43,7 +43,7 @@
4343
"endpoint": "localhost",
4444
"username": "admin",
4545
"password": "admin",
46-
"port": "30072",
46+
"port": "5672",
4747
"virtualHost": "monaideploy",
4848
"exchange": "monaideploy",
4949
"deadLetterExchange": "monaideploy-dead-letter",
@@ -55,7 +55,7 @@
5555
"endpoint": "localhost",
5656
"username": "admin",
5757
"password": "admin",
58-
"port": "30072",
58+
"port": "5672",
5959
"virtualHost": "monaideploy",
6060
"exchange": "monaideploy",
6161
"exportRequestQueue": "export_tasks"

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616

1717
using System.Globalization;
18-
using System.Linq;
19-
using System.Runtime.CompilerServices;
2018
using Ardalis.GuardClauses;
2119
using Microsoft.Extensions.Logging;
2220
using Microsoft.Extensions.Options;

tests/IntegrationTests/TaskManager.IntegrationTests/Features/__snapshots__/ExecutionStatsFeature.ExecutionStatsAreNotReturnedIfWorkflowOrTaskIsNotFound_Workflow_1_Task_3_.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
{
1817
"periodStart": "2023-01-01T00:00:00",
19-
"periodEnd": "2023-04-11T10:13:29.9717784+01:00",
18+
"periodEnd": "2023-04-26T15:01:00.9582693+01:00",
2019
"totalExecutions": 0,
2120
"totalFailures": 0,
21+
"totalInprogress": 1,
2222
"averageTotalExecutionSeconds": 0.0,
2323
"averageArgoExecutionSeconds": 0.0,
2424
"pageNumber": 1,

tests/IntegrationTests/TaskManager.IntegrationTests/Features/__snapshots__/ExecutionStatsFeature.ExecutionStatsAreNotReturnedIfWorkflowOrTaskIsNotFound_Workflow_2_Task_1_.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
{
1817
"periodStart": "2023-01-01T00:00:00",
19-
"periodEnd": "2023-04-11T10:13:26.6294812+01:00",
18+
"periodEnd": "2023-04-26T15:01:00.678874+01:00",
2019
"totalExecutions": 0,
2120
"totalFailures": 0,
21+
"totalInprogress": 1,
2222
"averageTotalExecutionSeconds": 0.0,
2323
"averageArgoExecutionSeconds": 0.0,
2424
"pageNumber": 1,

0 commit comments

Comments
 (0)