Skip to content

Commit 1477e4e

Browse files
committed
merge in develop
Signed-off-by: Neil South <[email protected]>
2 parents 4e7f6a6 + d819b68 commit 1477e4e

File tree

9 files changed

+135
-91
lines changed

9 files changed

+135
-91
lines changed

src/TaskManager/Plug-ins/Argo/Controllers/TemplateController.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public async Task<ActionResult<WorkflowTemplate>> CreateArgoTemplate()
5656

5757
if (string.IsNullOrWhiteSpace(value2))
5858
{
59-
return BadRequest("No file recieved");
59+
return BadRequest("No file received");
6060
}
6161
WorkflowTemplate? workflowTemplate = null;
6262
try

src/WorkflowManager/Common/Services/WorkflowService.cs

-13
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ public async Task<string> CreateAsync(Workflow workflow)
5454
{
5555
Guard.Against.Null(workflow);
5656

57-
foreach (var task in workflow.Tasks)
58-
{
59-
task.Args["workflow_name"] = workflow.Name;
60-
}
61-
6257
var id = await _workflowRepository.CreateAsync(workflow);
6358
_logger.WorkflowCreated(id, workflow.Name);
6459
return id;
@@ -76,14 +71,6 @@ public async Task<string> CreateAsync(Workflow workflow)
7671
return null;
7772
}
7873

79-
if (isUpdateToWorkflowName)
80-
{
81-
foreach (var task in workflow.Tasks)
82-
{
83-
task.Args["workflow_name"] = workflow.Name;
84-
}
85-
}
86-
8774
var result = await _workflowRepository.UpdateAsync(workflow, existingWorkflow);
8875
_logger.WorkflowUpdated(id, workflow.Name);
8976
return result;

src/WorkflowManager/Database/Interfaces/ITaskExecutionStatsRepository.cs

+51-17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
using System;
1818
using System.Collections.Generic;
19+
using System.Linq.Expressions;
1920
using System.Threading.Tasks;
2021
using Monai.Deploy.Messaging.Events;
2122
using Monai.Deploy.WorkflowManager.Contracts.Models;
@@ -27,54 +28,60 @@ public interface ITaskExecutionStatsRepository
2728
/// <summary>
2829
/// Creates a task dispatch event in the database.
2930
/// </summary>
30-
/// <param name="taskDispatchEvent">A TaskDispatchEvent to create.</param>
31+
/// <param name="TaskExecutionInfo"></param>
32+
/// <param name="workflowId">workflow id.</param>
33+
/// <param name="correlationId">task id.</param>
3134
/// <returns></returns>
3235
Task CreateAsync(TaskExecution TaskExecutionInfo, string workflowId, string correlationId);
3336

3437
/// <summary>
3538
/// Updates status of a task dispatch event in the database.
3639
/// </summary>
37-
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
40+
/// <param name="taskUpdateEvent"></param>
41+
/// <param name="workflowId">workflow id.</param>
42+
/// <param name="status">task id.</param>
3843
/// <returns></returns>
3944
Task UpdateExecutionStatsAsync(TaskExecution taskUpdateEvent, string workflowId, TaskExecutionStatus? status = null);
4045

4146
/// <summary>
4247
/// Updates status of a task now its been canceled.
4348
/// </summary>
44-
/// <param name="TaskCanceledException">A TaskCanceledException to update.</param>
45-
/// <returns></returns
49+
/// <param name="taskCanceledEvent">A TaskCanceledException to update.</param>
50+
/// <param name="workflowId">workflow id.</param>
51+
/// <param name="correlationId">task id.</param>
52+
/// <returns></returns>
4653
Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string workflowId, string correlationId);
4754

4855
/// <summary>
49-
/// Returns paged entries between the two given dates.
56+
/// Returns paged entries between the two given dates
5057
/// </summary>
5158
/// <param name="startTime">start of the range.</param>
5259
/// <param name="endTime">end of the range.</param>
60+
/// <param name="PageSize"></param>
61+
/// <param name="PageNumber"></param>
62+
/// <param name="workflowId">optional workflow id.</param>
63+
/// <param name="taskId">optional task id.</param>
5364
/// <returns>a collections of stats</returns>
5465
Task<IEnumerable<ExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowId = "", string taskId = "");
5566

5667
/// <summary>
57-
/// Return the total number of stats between the dates
68+
/// Return the count of the entries with this status, or all if no status given.
5869
/// </summary>
5970
/// <param name="startTime">start of the range.</param>
6071
/// <param name="endTime">end of the range.</param>
61-
/// <returns>The count of all records in range</returns>
62-
//Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");
63-
64-
/// <summary>
65-
/// Return the count of the entries with this status, or all if no status given
66-
/// </summary>
67-
/// <param name="start">start of the range.</param>
68-
/// <param name="endTime">end of the range.</param>
6972
/// <param name="status">the status to get count of, or string.empty</param>
73+
/// <param name="workflowId">optional workflow id.</param>
74+
/// <param name="taskId">optional task id.</param>
7075
/// <returns>The count of all records in range</returns>
71-
Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowId = "", string taskId = "");
76+
Task<long> GetStatsStatusCountAsync(DateTime startTime, DateTime endTime, string status = "", string workflowId = "", string taskId = "");
7277

7378
/// <summary>
7479
/// Returns all stats in Succeeded status.
7580
/// </summary>
7681
/// <param name="startTime">start of the range.</param>
7782
/// <param name="endTime">end of the range.</param>
83+
/// <param name="workflowId">optional workflow id.</param>
84+
/// <param name="taskId">optional task id.</param>
7885
/// <returns>All stats that succeeded</returns>
7986
Task<long> GetStatsStatusSucceededCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");
8087

@@ -83,16 +90,43 @@ public interface ITaskExecutionStatsRepository
8390
/// </summary>
8491
/// <param name="startTime">start of the range.</param>
8592
/// <param name="endTime">end of the range.</param>
93+
/// <param name="workflowId">optional workflow id.</param>
94+
/// <param name="taskId">optional task id.</param>
8695
/// <returns>All stats that failed or partially failed</returns>
8796
Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");
8897

8998
/// <summary>
90-
/// Calculates the average exection time for the given range
99+
/// Returns total ran executions status that have ran to completion. (not dispatched, created, accepted)
100+
/// </summary>
101+
/// <param name="startTime">start of the range.</param>
102+
/// <param name="endTime">end of the range.</param>
103+
/// <param name="workflowId">optional workflow id.</param>
104+
/// <param name="taskId">optional task id.</param>
105+
/// <returns>All stats that failed or partially failed</returns>
106+
Task<long> GetStatsTotalCompleteExecutionsCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");
107+
108+
109+
/// <summary>
110+
/// Calculates the average execution time for the given range
91111
/// </summary>
92112
/// <param name="startTime">start of the range.</param>
93113
/// <param name="endTime">end of the range.</param>
94-
/// <returns>the average exection times in the time range</returns>
114+
/// <param name="workflowId">optional workflow id.</param>
115+
/// <param name="taskId">optional task id.</param>
116+
/// <returns>the average execution times in the time range</returns>
95117
Task<(double avgTotalExecution, double avgArgoExecution)> GetAverageStats(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");
96118

119+
120+
/// <summary>
121+
/// Return the total number of stats between the dates with optional status filter.
122+
/// </summary>
123+
/// <param name="startTime">start of the range.</param>
124+
/// <param name="endTime">end of the range.</param>
125+
/// <param name="statusFilter"></param>
126+
/// <param name="workflowId">optional workflow id.</param>
127+
/// <param name="taskId">optional task id.</param>
128+
/// <returns>The count of all records in range</returns>
129+
/// <summary>
130+
Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, Expression<Func<ExecutionStats, bool>>? statusFilter = null, string workflowId = "", string taskId = "");
97131
}
98132
}

src/WorkflowManager/Database/Repositories/TaskExecutionStatsRepository.cs

+65-51
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using System;
1818
using System.Collections.Generic;
1919
using System.Linq;
20+
using System.Linq.Expressions;
2021
using System.Threading.Tasks;
2122
using Ardalis.GuardClauses;
2223
using Microsoft.Extensions.Logging;
@@ -159,14 +160,7 @@ public async Task<IEnumerable<ExecutionStats>> GetStatsAsync(DateTime startTime,
159160
T.StartedUTC >= startTime &&
160161
T.StartedUTC <= endTime.ToUniversalTime() &&
161162
(workflowNull || T.WorkflowId == workflowId) &&
162-
(taskIdNull || T.TaskId == taskId)
163-
//&&
164-
//(
165-
// T.Status == TaskExecutionStatus.Succeeded.ToString()
166-
// || T.Status == TaskExecutionStatus.Failed.ToString()
167-
// || T.Status == TaskExecutionStatus.PartialFail.ToString()
168-
// )
169-
)
163+
(taskIdNull || T.TaskId == taskId))
170164
.Limit(PageSize)
171165
.Skip((PageNumber - 1) * PageSize)
172166
.ToListAsync();
@@ -187,66 +181,86 @@ private static ExecutionStats ExposeExecutionStats(ExecutionStats taskExecutionS
187181
var statKeys = taskUpdateEvent.ExecutionStats.Keys.Where(v => v.StartsWith("podStartTime") || v.StartsWith("podFinishTime"));
188182
if (statKeys.Any())
189183
{
190-
var start = DateTime.Now;
191-
var end = new DateTime();
192-
foreach (var statKey in statKeys)
193-
{
194-
if (statKey.Contains("StartTime") && DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var startTime))
195-
{
196-
start = (startTime < start ? startTime : start);
197-
}
198-
else if (DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var endTime))
199-
{
200-
end = (endTime > end ? endTime : start);
201-
}
202-
}
203-
taskExecutionStats.ExecutionTimeSeconds = (end - start).TotalMilliseconds / 1000;
184+
CalculatePodExecutionTime(taskExecutionStats, taskUpdateEvent, statKeys);
204185
}
205186
}
206187
return taskExecutionStats;
207188
}
208189

209-
public async Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowId = "", string taskId = "")
190+
/// <summary>
191+
/// Calculates and sets ExecutionStats ExecutionTimeSeconds
192+
/// </summary>
193+
/// <param name="taskExecutionStats"></param>
194+
/// <param name="taskUpdateEvent"></param>
195+
/// <param name="statKeys"></param>
196+
private static void CalculatePodExecutionTime(ExecutionStats taskExecutionStats, TaskExecution taskUpdateEvent, IEnumerable<string> statKeys)
210197
{
211-
var statusNull = string.IsNullOrWhiteSpace(status);
212-
var workflowNull = string.IsNullOrWhiteSpace(workflowId);
213-
var taskIdNull = string.IsNullOrWhiteSpace(taskId);
198+
var start = DateTime.Now;
199+
var end = new DateTime();
200+
foreach (var statKey in statKeys)
201+
{
202+
if (statKey.Contains("StartTime") && DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var startTime))
203+
{
204+
start = (startTime < start ? startTime : start);
205+
}
206+
else if (DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var endTime))
207+
{
208+
end = (endTime > end ? endTime : start);
209+
}
210+
}
211+
taskExecutionStats.ExecutionTimeSeconds = (end - start).TotalMilliseconds / 1000;
212+
}
214213

215-
return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
216-
T.StartedUTC >= start.ToUniversalTime() &&
217-
T.StartedUTC <= endTime.ToUniversalTime() &&
218-
(workflowNull || T.WorkflowId == workflowId) &&
219-
(taskIdNull || T.TaskId == taskId) &&
220-
(statusNull || T.Status == status));
214+
public async Task<long> GetStatsStatusCountAsync(DateTime startTime, DateTime endTime, string status = "", string workflowId = "", string taskId = "")
215+
{
216+
Expression<Func<ExecutionStats, bool>>? statusFilter = null;
217+
if (!string.IsNullOrWhiteSpace(status))
218+
{
219+
statusFilter = t => t.Status == status;
220+
}
221+
return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
221222
}
222223

223-
public async Task<long> GetStatsStatusSucceededCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
224+
public async Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, Expression<Func<ExecutionStats, bool>>? statusFilter = null, string workflowId = "", string taskId = "")
224225
{
225226
var workflowNull = string.IsNullOrWhiteSpace(workflowId);
226227
var taskIdNull = string.IsNullOrWhiteSpace(taskId);
227228

228-
return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
229-
T.StartedUTC >= startTime.ToUniversalTime() &&
230-
T.StartedUTC <= endTime.ToUniversalTime() &&
231-
(workflowNull || T.WorkflowId == workflowId) &&
232-
(taskIdNull || T.TaskId == taskId) &&
233-
T.Status == TaskExecutionStatus.Succeeded.ToString());
229+
var builder = Builders<ExecutionStats>.Filter;
230+
var filter = builder.Empty;
231+
232+
filter &= builder.Where(t => t.StartedUTC >= startTime.ToUniversalTime());
233+
filter &= builder.Where(t => t.StartedUTC <= endTime.ToUniversalTime());
234+
filter &= builder.Where(t => workflowNull || t.WorkflowId == workflowId);
235+
filter &= builder.Where(t => taskIdNull || t.TaskId == taskId);
236+
if (statusFilter is not null)
237+
{
238+
filter &= builder.Where(statusFilter);
239+
}
240+
241+
return await _taskExecutionStatsCollection.CountDocumentsAsync(filter);
234242
}
235243

236-
public async Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
244+
public async Task<long> GetStatsTotalCompleteExecutionsCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
237245
{
238-
var workflowNull = string.IsNullOrWhiteSpace(workflowId);
239-
var taskIdNull = string.IsNullOrWhiteSpace(taskId);
246+
var dispatched = TaskExecutionStatus.Dispatched.ToString();
247+
var created = TaskExecutionStatus.Created.ToString();
248+
var accepted = TaskExecutionStatus.Accepted.ToString();
249+
Expression<Func<ExecutionStats, bool>> statusFilter = t => t.Status != dispatched && t.Status != created && t.Status != accepted;
250+
251+
return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
252+
}
240253

241-
return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
242-
T.StartedUTC >= startTime.ToUniversalTime() &&
243-
T.StartedUTC <= endTime.ToUniversalTime() &&
244-
(workflowNull || T.WorkflowId == workflowId) &&
245-
(taskIdNull || T.TaskId == taskId) &&
246-
(
247-
T.Status == TaskExecutionStatus.Failed.ToString() ||
248-
T.Status == TaskExecutionStatus.PartialFail.ToString()
249-
));
254+
public async Task<long> GetStatsStatusSucceededCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
255+
{
256+
Expression<Func<ExecutionStats, bool>> statusFilter = t => t.Status == TaskExecutionStatus.Succeeded.ToString();
257+
return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
258+
}
259+
260+
public async Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
261+
{
262+
Expression<Func<ExecutionStats, bool>> statusFilter = t => t.Status == TaskExecutionStatus.Failed.ToString() || t.Status == TaskExecutionStatus.PartialFail.ToString();
263+
return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
250264
}
251265

252266
public async Task<(double avgTotalExecution, double avgArgoExecution)> GetAverageStats(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")

src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs

+1
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,7 @@ private async Task<bool> DispatchTask(WorkflowInstance workflowInstance, Workflo
743743
AttachPatientMetaData(taskExec, payload.PatientDetails);
744744
}
745745

746+
taskExec.TaskPluginArguments["workflow_name"] = workflow!.Workflow!.Name;
746747
_logger.LogGeneralTaskDispatchInformation(workflowInstance.PayloadId, taskExec.TaskId, workflowInstance.Id, workflow?.Id, JsonConvert.SerializeObject(pathOutputArtifacts));
747748
var taskDispatchEvent = EventMapper.ToTaskDispatchEvent(taskExec, workflowInstance, pathOutputArtifacts, correlationId, _storageConfiguration);
748749
var jsonMesssage = new JsonMessage<TaskDispatchEvent>(taskDispatchEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, taskDispatchEvent.CorrelationId, Guid.NewGuid().ToString());

0 commit comments

Comments
 (0)