Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/Abstractions/TaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public TaskOptions(TaskOptions options)
Check.NotNull(options);
this.Retry = options.Retry;
this.Tags = options.Tags;
this.CancellationToken = options.CancellationToken;
}

/// <summary>
Expand All @@ -51,6 +52,80 @@ public TaskOptions(TaskOptions options)
/// </summary>
public IDictionary<string, string>? Tags { get; init; }

/// <summary>
/// Gets the cancellation token that can be used to cancel the task.
/// </summary>
/// <remarks>
/// <para>
/// The cancellation token provides cooperative cancellation for activities, sub-orchestrators, and retry logic.
/// Due to the durable orchestrator execution model, cancellation only occurs at specific points when the
/// orchestrator code is executing.
/// </para>
/// <para>
/// <strong>Cancellation behavior:</strong>
/// </para>
/// <para>
/// 1. <strong>Pre-scheduling check:</strong> If the token is cancelled before calling
/// <c>CallActivityAsync</c> or <c>CallSubOrchestratorAsync</c>, a <see cref="TaskCanceledException"/> is thrown
/// immediately without scheduling the task.
/// </para>
/// <para>
/// 2. <strong>Retry handlers:</strong> The cancellation token is passed to custom retry handlers via
/// <see cref="RetryContext"/>, allowing them to check for cancellation and stop retrying between attempts.
/// </para>
/// <para>
/// <strong>Important limitation:</strong> Once an activity or sub-orchestrator is scheduled, the orchestrator
/// yields execution and waits for the task to complete. During this yield period, the orchestrator code is not
/// running, so it cannot respond to cancellation requests. Cancelling the token while waiting will not wake up
/// the orchestrator or cancel the waiting task. This is a fundamental limitation of the durable orchestrator
/// execution model.
/// </para>
/// <para>
/// Note: Cancelling a parent orchestrator's token does not terminate sub-orchestrator instances that have
/// already been scheduled.
/// </para>
/// <example>
/// Example of pre-scheduling cancellation:
/// <code>
/// using CancellationTokenSource cts = new CancellationTokenSource();
/// cts.Cancel(); // Cancel before scheduling
///
/// TaskOptions options = new TaskOptions { CancellationToken = cts.Token };
///
/// try
/// {
/// // This will throw TaskCanceledException without scheduling the activity
/// string result = await context.CallActivityAsync&lt;string&gt;("MyActivity", "input", options);
/// }
/// catch (TaskCanceledException)
/// {
/// // Handle cancellation
/// }
/// </code>
/// </example>
/// <example>
/// Example of using cancellation with retry logic:
/// <code>
/// using CancellationTokenSource cts = new CancellationTokenSource();
/// TaskOptions options = new TaskOptions
/// {
/// Retry = TaskRetryOptions.FromRetryHandler(retryContext =>
/// {
/// if (retryContext.CancellationToken.IsCancellationRequested)
/// {
/// return false; // Stop retrying
/// }
/// return retryContext.LastAttemptNumber &lt; 3;
/// }),
/// CancellationToken = cts.Token
/// };
///
/// await context.CallActivityAsync("MyActivity", "input", options);
/// </code>
/// </example>
/// </remarks>
public CancellationToken CancellationToken { get; init; }

/// <summary>
/// Returns a new <see cref="TaskOptions" /> from the provided <see cref="RetryPolicy" />.
/// </summary>
Expand Down
32 changes: 26 additions & 6 deletions src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,25 @@ public override async Task<T> CallActivityAsync<T>(
try
{
IDictionary<string, string> tags = ImmutableDictionary<string, string>.Empty;
CancellationToken cancellationToken = default;
if (options is TaskOptions callActivityOptions)
{
if (callActivityOptions.Tags is not null)
{
tags = callActivityOptions.Tags;
}

cancellationToken = callActivityOptions.CancellationToken;
}

// If cancellation was requested before starting, throw immediately
// Note: Once the activity is scheduled, the orchestrator yields and cannot respond to
// cancellation until it resumes, so this pre-check is the only cancellation point.
if (cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException("The task was cancelled before it could be scheduled.");
}

// TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7)
#pragma warning disable 0618
if (options?.Retry?.Policy is RetryPolicy policy)
{
Expand All @@ -176,7 +186,7 @@ public override async Task<T> CallActivityAsync<T>(
parameters: input),
name.Name,
handler,
default);
cancellationToken);
}
else
{
Expand Down Expand Up @@ -217,6 +227,16 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
throw new InvalidOperationException(errorMsg);
}

CancellationToken cancellationToken = options?.CancellationToken ?? default;

// If cancellation was requested before starting, throw immediately
// Note: Once the sub-orchestrator is scheduled, the orchestrator yields and cannot respond to
// cancellation until it resumes, so this pre-check is the only cancellation point.
if (cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException("The sub-orchestrator was cancelled before it could be scheduled.");
}

try
{
if (options?.Retry?.Policy is RetryPolicy policy)
Expand All @@ -226,7 +246,7 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
version,
instanceId,
policy.ToDurableTaskCoreRetryOptions(),
input,
input,
options.Tags);
}
else if (options?.Retry?.Handler is AsyncRetryHandler handler)
Expand All @@ -236,19 +256,19 @@ public override async Task<TResult> CallSubOrchestratorAsync<TResult>(
orchestratorName.Name,
version,
instanceId,
input,
input,
options?.Tags),
orchestratorName.Name,
handler,
default);
cancellationToken);
}
else
{
return await this.innerContext.CreateSubOrchestrationInstance<TResult>(
orchestratorName.Name,
version,
instanceId,
input,
input,
options?.Tags);
}
}
Expand Down
Loading
Loading