-
Notifications
You must be signed in to change notification settings - Fork 142
Description
What are you really trying to do?
I'm building an AI agent workflow using Temporal and the OpenAI Agents SDK. Users can start long-running agent conversations and need the ability to cancel them mid-execution using Temporal's built-in .cancel() method. When a user requests cancellation, I want to:
- Stop the agent execution immediately (or as soon as possible)
- Execute my
handle_cancellation()method to perform cleanup (save interrupt tag, notify frontend, clear session) - Have the cancellation handler run reliably every time
The agent execution happens via await Runner.run() from the OpenAI Agents SDK, which can take several minutes to complete.
Describe the bug
When a user requests workflow cancellation via Temporal's .cancel() method, there's a race condition where the workflow gets marked as "Cancelled" by Temporal before my exception handler can execute handle_cancellation().
More or less 10% of the time, no CancelledError exception was raised during the workflow task execution, so my except block never ran to call handle_cancellation(). The workflow was cancelled, after the task completed successfully.
For the workflow result I get:
{
"type": "workflowExecutionCanceledEventAttributes",
"workflowTaskCompletedEventId": "41"
}
Expected behavior:
The except block should catch the cancellation and execute handle_cancellation():
except Exception as e:
if is_cancelled_exception(e) or (e.__cause__ and is_cancelled_exception(e.__cause__)):
workflow.logger.info("AgentWorkflow cancelled by user")
return await self.handle_cancellation(topic, session, payload)Actual behavior:
The workflow is cancelled by Temporal before the exception handler runs, so handle_cancellation() never executes and:
- Event log shows "Workflow cancelled" instead of showing cleanup activities
- No interrupt tag is saved to OpenSearch
- Frontend never receives cancellation notification
- Session is not cleared
- Workflow status shows "Cancelled" instead of "Completed"
Minimal Reproduction
from temporalio import workflow
from temporalio.exceptions import is_cancelled_exception
from agents import Runner
@workflow.defn
class AgentWorkflow:
async def handle_cancellation(self, topic: str, session, payload):
"""
Critical cleanup:
- Save interrupt tag to message history
- Notify frontend
- Clear session
"""
workflow.logger.info("Cleanup executed")
interrupt_tag = "<interrupt />"
# Save interrupt tag to OpenSearch
await workflow.execute_activity(
activity=OpenSearchActivities.save_agent_message_history,
args=(SaveAgentMessageHistory(
workflow_id=workflow.info().workflow_id,
session_id=session.session_id,
messages=[{"content": interrupt_tag, "role": "assistant", "type": "message"}]
),),
start_to_close_timeout=DEFAULT_START_TO_CLOSE_TIMEOUT,
retry_policy=DEFAULT_RETRY_POLICY,
)
# Notify frontend
await notify_frontend(
topic=topic,
message=MessageAgentFinalResponseToFrontend(value=interrupt_tag, session_id=session.session_id),
)
return interrupt_tag
@workflow.run
async def run(self, payload) -> str | None:
try:
topic = f"agent_{payload.uid}_{payload.mcp_session_id}"
info = workflow.info()
session = AgentSession(info.workflow_id, session_id=str(workflow.uuid4()))
# Long-running agent execution with internal activities
result = await Runner.run(
starting_agent=agent,
input=user_input,
session=session,
max_turns=30,
)
return result.final_output
except Exception as e:
if is_cancelled_exception(e) or (e.__cause__ and is_cancelled_exception(e.__cause__)):
# This never executes when cancellation arrives
# between activity completion and workflow task start
return await self.handle_cancellation(topic, session, payload)
raise eSteps to reproduce:
- Start the workflow with a long-running
Runner.run()call - Wait for
Runner.run()to complete an internal activity - Call
workflow_handle.cancel()immediately after the activity completes (within ~13ms window) - Observe that the workflow gets cancelled by Temporal before
handle_cancellation()runs
Environment/Versions
- OS and processor: M3 Mac (Apple Silicon)
- Temporal SDK: Python SDK version
1.8.01.19.0 - Python: 3.12
- OpenAI Agents SDK: 0.0.11
Additional context
The handle_cancellation() method performs critical cleanup:
- Saves interrupt tag to OpenSearch (so conversation history shows the cancellation)
- Sends notifications to frontend via Lightstreamer
- Clears the agent session
- Updates Langfuse trace with cancellation info
When it doesn't run, the system is left in an inconsistent state and users see incomplete or confusing behavior.
Questions:
- Is there a way to ensure
CancelledErroris raised during the workflow task when cancellation is pending? - Should I be using a different pattern (e.g.,
CancellationScope, signals, or polling) for handling cancellation in workflows with long-running operations likeRunner.run()? - Is there a way to guarantee cleanup code runs even when the workflow is cancelled at a task boundary?