Skip to content

Commit 86f1ca4

Browse files
committed
played by rules with durable functions adding orchstrators and activity
1 parent 862c55c commit 86f1ca4

3 files changed

Lines changed: 87 additions & 40 deletions

File tree

AsyncJob.cs

Lines changed: 82 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,20 @@
44
using Microsoft.Azure.Functions.Worker.Http;
55
using Microsoft.DurableTask;
66
using Microsoft.DurableTask.Client;
7+
using Microsoft.DurableTask.Entities;
78
using Microsoft.Extensions.Logging;
89

910

1011
namespace gbelenky.AsyncJob;
1112

13+
// Define the state of the JobEntity
14+
public record JobEntityState(string ThirdPartyJobId, string JobName);
15+
16+
1217
public static class AsyncJob
1318
{
14-
[Function(nameof(AsyncJobOrchestrator))]
15-
public static async Task<List<string>> AsyncJobOrchestrator(
19+
[Function(nameof(JobStartOrchestrator))] // Renamed
20+
public static async Task<List<string>> JobStartOrchestrator( // Renamed
1621
[OrchestrationTrigger] TaskOrchestrationContext context)
1722
{
1823
ILogger logger = context.CreateReplaySafeLogger(nameof(AsyncJob));
@@ -46,55 +51,95 @@ public static async Task<List<string>> AsyncJobOrchestrator(
4651
return outputs;
4752
}
4853

54+
public record StatusPayload(string? JobStatus);
55+
56+
[Function(nameof(GetJobStatusActivity))]
57+
public static async Task<string> GetJobStatusActivity(
58+
[ActivityTrigger] string targetInstanceId,
59+
[DurableClient] DurableTaskClient durableTaskClient, // Inject DurableTaskClient
60+
FunctionContext executionContext)
61+
{
62+
ILogger logger = executionContext.GetLogger(nameof(GetJobStatusActivity));
63+
logger.LogInformation("Activity: Attempting to fetch status for instance ID: {TargetInstanceId}", targetInstanceId);
64+
65+
OrchestrationMetadata? instanceMetadata = await durableTaskClient.GetInstanceAsync(targetInstanceId, getInputsAndOutputs: true);
66+
67+
if (instanceMetadata == null)
68+
{
69+
logger.LogWarning("Activity: Instance not found: {TargetInstanceId}", targetInstanceId);
70+
return "NotFound";
71+
}
72+
73+
// The custom status is a JToken, deserialize it to string.
74+
// Corrected access to custom status via SerializedCustomStatus and ensuring it's not null before deserializing.
75+
string? customStatus = null;
76+
if (instanceMetadata.SerializedCustomStatus != null)
77+
{
78+
customStatus = System.Text.Json.JsonSerializer.Deserialize<string>(instanceMetadata.SerializedCustomStatus);
79+
}
80+
81+
logger.LogInformation("Activity: Successfully fetched status for {TargetInstanceId}. Status: '{CustomStatus}'", targetInstanceId, customStatus ?? "null");
82+
return customStatus;
83+
}
4984

5085

86+
[Function(nameof(JobStatusOrchestrator))]
87+
public static async Task<string> JobStatusOrchestrator(
88+
[OrchestrationTrigger] TaskOrchestrationContext context)
89+
{
90+
string instanceId = context.GetInput<string>() ?? string.Empty;
91+
var customStatus = await context.CallActivityAsync<string>(nameof(GetJobStatusActivity), instanceId);
92+
return customStatus ?? "Unknown";
93+
}
5194

5295
// add a function to retuen the status of the job
53-
[Function(nameof(AsyncJobStatus))]
54-
public static async Task<HttpResponseData> AsyncJobStatus(
96+
[Function(nameof(JobStatusTrigger))] // Renamed
97+
public static async Task<HttpResponseData> JobStatusTrigger( // Renamed
5598
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "job-status/{jobName}")] HttpRequestData req,
56-
[DurableClient] DurableTaskClient client,
57-
FunctionContext executionContext, string jobName)
99+
[DurableClient] DurableTaskClient client, // This is the DurableTaskClient
100+
FunctionContext executionContext,
101+
string jobName)
58102
{
59-
ILogger logger = executionContext.GetLogger("AsyncJobStatus");
60-
string instanceId = $"job-{jobName}";
103+
ILogger logger = executionContext.GetLogger(nameof(JobStatusTrigger));
104+
string targetJobInstanceId = $"job-{jobName}"; // Instance ID of the job we want to get status for
61105

62-
var host = req.Url.Host;
106+
logger.LogInformation("Trigger: Received request to get status for job: {JobName} (Instance ID: {TargetJobInstanceId})", jobName, targetJobInstanceId);
63107

64-
// Build the Durable Functions status URL correctly using string interpolation
65-
string statusUrl = $"{req.Url.Scheme}://{host}:{req.Url.Port}/runtime/webhooks/durabletask/instances/{instanceId}";
66-
HttpClient httpClient = new HttpClient();
67-
var response = await httpClient.GetAsync(statusUrl);
68-
var responseContent = await response.Content.ReadAsStringAsync();
69-
string? customStatus = null;
70-
try
71-
{
72-
using var doc = System.Text.Json.JsonDocument.Parse(responseContent);
73-
if (doc.RootElement.TryGetProperty("customStatus", out var statusProp))
74-
{
75-
customStatus = statusProp.GetString();
76-
}
77-
}
78-
catch
108+
// Start the orchestrator that will fetch the status.
109+
// The input to JobStatusOrchestrator is the instance ID of the job whose status is being queried.
110+
string statusQueryOrchestrationId = await client.ScheduleNewOrchestrationInstanceAsync(
111+
nameof(JobStatusOrchestrator),
112+
targetJobInstanceId);
113+
114+
logger.LogInformation("Trigger: Started status query orchestration with ID = '{StatusQueryOrchestrationId}' to get status for job '{JobName}'.", statusQueryOrchestrationId, jobName);
115+
116+
// Wait for the orchestration to complete (timeout after 30 seconds)
117+
var timeout = TimeSpan.FromSeconds(30);
118+
using var cts = new CancellationTokenSource(timeout);
119+
var orchestrationStatus = await client.WaitForInstanceCompletionAsync(statusQueryOrchestrationId, true, cts.Token);
120+
121+
var response = req.CreateResponse(HttpStatusCode.OK);
122+
if (orchestrationStatus == null)
79123
{
80-
// Optionally log or handle JSON parse errors
124+
await response.WriteAsJsonAsync(new { status = "Timeout while retrieving job status" });
125+
return response;
81126
}
82-
var httpResponse = req.CreateResponse(HttpStatusCode.OK);
83-
string json = System.Text.Json.JsonSerializer.Serialize(new { jobStatus = customStatus });
84-
logger.LogInformation("Job {JobId} status: {Status}", instanceId, json);
85-
await httpResponse.WriteStringAsync(json);
86-
return httpResponse;
87-
}
88127

128+
var statusPayload = orchestrationStatus.ReadOutputAs<string>();
129+
await response.WriteAsJsonAsync(new {
130+
status = statusPayload ?? "Unknown"
131+
});
132+
return response;
133+
}
89134

90-
[Function("AsyncJobTrigger")]
91-
public static async Task<HttpResponseData> AsyncJobTrigger(
135+
[Function("JobStartTrigger")] // Renamed from AsyncJobTrigger
136+
public static async Task<HttpResponseData> JobStartTrigger( // Renamed from AsyncJobTrigger
92137
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "job-start/{jobName}")] HttpRequestData req,
93138
[DurableClient] DurableTaskClient client,
94139
FunctionContext executionContext, string jobName)
95140
{
96141
// The function input comes from the request content.
97-
ILogger logger = executionContext.GetLogger("AsyncJobTrigger");
142+
ILogger logger = executionContext.GetLogger("JobStartTrigger"); // Updated logger category
98143
// extract ASYNC_JOB_QUEUED_DURATION_SEC from the environment variable
99144
double queuedDuration = double.TryParse(
100145
Environment.GetEnvironmentVariable("ASYNC_JOB_QUEUED_DURATION_SEC"),
@@ -119,7 +164,7 @@ public static async Task<HttpResponseData> AsyncJobTrigger(
119164

120165
// Start new orchestration instance, specifying the desired instance ID via options
121166
await client.ScheduleNewOrchestrationInstanceAsync(
122-
nameof(AsyncJobOrchestrator),
167+
nameof(JobStartOrchestrator), // Updated to new name
123168
orchParams,
124169
new StartOrchestrationOptions { InstanceId = instanceId });
125170
logger.LogInformation("Started orchestration with ID = '{InstanceId}' for job '{JobName}'.", instanceId, jobName);
@@ -139,4 +184,6 @@ private record OrchParams(
139184
string ThirdPartyJobId,
140185
double QueuedDuration,
141186
double InProgressDuration);
187+
188+
142189
}

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ lightweight async job implementation
1212
2. **Durable Task Framework**: The project employs the Durable Task framework for managing long-running workflows and state persistence.
1313
3. **Customizable Durations**: Job states such as `Queued` and `InProgress` can be customized using environment variables.
1414
4. **State Management**: Provides detailed logging and state transitions for each job instance.
15-
5. **HTTP Trigger Support**: The `AsyncJobTrigger` function allows initiating jobs via HTTP requests.
15+
5. **HTTP Trigger Support**: The `JobStartTrigger` function allows initiating jobs via HTTP requests.
1616

1717
### Environment Variables:
1818
The project uses the following environment variables for configuration:
@@ -37,11 +37,11 @@ You can query the current status of a job instance using the AsyncJobStatus HTTP
3737
/job-status/{jobName}
3838
```
3939

40-
This endpoint returns the current status and details of the specified job instance. The response includes the job's name, instance ID, runtime status, input parameters, custom status, output, and timestamps. Example response:
40+
This endpoint returns the current status of the specified job instance. Example response:
4141

4242
```json
4343
{
44-
"jobStatus": "Queued"
44+
"status": "Queued"
4545
}
4646
```
4747

test.http

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
GET http://localhost:7071/api/job-start/gb4722 HTTP/1.1
1+
GET http://localhost:7071/api/job-start/gb4730 HTTP/1.1
22

33
###
4-
GET http://localhost:7071/api/job-status/gb4722 HTTP/1.1
4+
GET http://localhost:7071/api/job-status/gb4730 HTTP/1.1
55

66
###
77
GET http://localhost:7071/api/delay HTTP/1.1

0 commit comments

Comments
 (0)