-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
3.1.5
If "Other Airflow 3 version" selected, which one?
No response
What happened?
When using the SQLExecuteQueryTrigger through paginated GenericTransfer which returns a non serializable result, the triggerer crashes:
2026-01-05T14:11:58.319612Z [info ] Triggerer's async thread was blocked for 1.34 seconds, likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on overrunning coroutines. [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:736
--- Supervised process Last chance exception handler ---
Traceback (most recent call last):
File "/usr/local/lib/python3.13/site-packages/airflow/sdk/execution_time/supervisor.py", line 388, in _fork_main
target()
~~~~~~^^
File "/usr/local/lib/python3.13/site-packages/airflow/jobs/triggerer_job_runner.py", line 740, in run_in_process
TriggerRunner().run()
~~~~~~~~~~~~~~~~~~~^^
File "/usr/local/lib/python3.13/site-packages/airflow/jobs/triggerer_job_runner.py", line 849, in run
asyncio.run(self.arun())
~~~~~~~~~~~^^^^^^^^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 195, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "/usr/lib64/python3.13/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/usr/lib64/python3.13/asyncio/base_events.py", line 725, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "/usr/local/lib/python3.13/site-packages/airflow/jobs/triggerer_job_runner.py", line 873, in arun
await self.sync_state_to_supervisor(finished_ids)
File "/usr/local/lib/python3.13/site-packages/airflow/jobs/triggerer_job_runner.py", line 1062, in sync_state_to_supervisor
resp = await self.comms_decoder.asend(msg)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.13/site-packages/airflow/jobs/triggerer_job_runner.py", line 794, in asend
bytes = frame.as_bytes()
File "/usr/local/lib/python3.13/site-packages/airflow/sdk/execution_time/comms.py", line 148, in as_bytes
self.req_encoder.encode_into(self, buffer, 4)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.13/site-packages/airflow/sdk/execution_time/comms.py", line 125, in _msgpack_enc_hook
raise NotImplementedError(f"Objects of type {type(obj)} are not supported")
NotImplementedError: Objects of type <class 'oracledb.LOB'> are not supported
2026-01-05T14:12:11.661360Z [warning ] Process exited abnormally [airflow.jobs.triggerer_job_runner] exit_code=126 loc=supervisor.py:823
2026-01-05T14:12:11.674547Z [error ] Trigger runner process has died! Exiting. [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:524
2026-01-05T14:12:11.674924Z [info ] Waiting for triggers to clean up [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:177
2026-01-05T14:12:11.675155Z [info ] Exited trigger loop [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:182
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [8]
What you think should happen instead?
The error should be logged and the triggerer cancelled but the trigger process should continue to run and not stop.
When checking the sync_state_to_supervisor method I see that only IncompleteReadError exceptions are catched, others are just raised which makes the triggerer process stop as in this case an NotImplementedError is raised which is not handled:
async def sync_state_to_supervisor(self, finished_ids: list[int]):
# Copy out of our dequeues in threadsafe manner to sync state with parent
events_to_send = []
while self.events:
data = self.events.popleft()
events_to_send.append(data)
failures_to_send = []
while self.failed_triggers:
id, exc = self.failed_triggers.popleft()
tb = format_exception(type(exc), exc, exc.__traceback__) if exc else None
failures_to_send.append((id, tb))
msg = messages.TriggerStateChanges(
events=events_to_send, finished=finished_ids, failures=failures_to_send
)
if not events_to_send:
msg.events = None
if not failures_to_send:
msg.failures = None
if not finished_ids:
msg.finished = None
# Tell the monitor that we've finished triggers so it can update things
try:
resp = await self.comms_decoder.asend(msg)
except asyncio.IncompleteReadError:
if task := asyncio.current_task():
task.cancel("EOF - shutting down")
return
raise
if not isinstance(resp, messages.TriggerStateSync):
raise RuntimeError(f"Expected to get a TriggerStateSync message, instead we got {type(msg)}")
self.to_create.extend(resp.to_create)
self.to_cancel.extend(resp.to_cancel)
Just catching the exception isn't enough, as we don't know which triggerer caused the exception in this case, so we have to detect which triggerer caused the error so we can cancel it, here the exception can be caused by any of the events in the events_to_send list.
How to reproduce
Use Oracle specific functions in the SQL query with the GenericTransfer in pagination mode.
Operating System
Redhat Linux
Versions of Apache Airflow Providers
Using latest versions from constraints-3-1 branch
Deployment
Other 3rd-party Helm chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct