Skip to content

Conversation

@dianfu
Copy link
Contributor

@dianfu dianfu commented Oct 24, 2025

What is the purpose of the change

  • This pull request provides basic support of async function in Python DataStream API. It currently only supports unordered mode. *

Brief change log

    • Introduce basic structures, e.g. UnorderedStreamElementQueue, Emitter, AsyncFunctionRunner *
    • Introduce AsyncOperation which is responsible for async function execution *

Verifying this change

    • Added tests in test_async_function.py *

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not documented: will add documentation in a separate pull request)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 24, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@dianfu dianfu changed the title [FLINK-38190][python] Support unordered mode of async function in Python DataStream API [FLINK-38559][python] Support unordered mode of async function in Python DataStream API Oct 24, 2025
def _validate(data_stream: DataStream, async_function: AsyncFunction):
if not inspect.iscoroutinefunction(async_function.async_invoke):
raise Exception("Method 'async_invoke' of class '%s' should be declared as 'async def'."
% type(async_function))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
% type(async_function))
% type(async_function).__name__)

i think type(function) will return a verbose result, maybe this change gives actual name

Copy link
Contributor Author

@dianfu dianfu Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type(function) returns the full path of the class name and type(async_function).__name__ returns just the class name. My intention here is to print the full path which I think will be more helpful.

j_python_data_stream_function_operator))

@staticmethod
def _validate(data_stream: DataStream, async_function: AsyncFunction):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _validate(data_stream: DataStream, async_function: AsyncFunction):
def _validate(data_stream: DataStream, async_function: AsyncFunction) -> None:

return False


class ResultHandler(ResultFuture, Generic[OUT]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do i understand correctly that retry is not supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. It's planned to be supported in a separate ticket https://issues.apache.org/jira/browse/FLINK-38561


class ResultHandler(ResultFuture, Generic[OUT]):

def __init__(self, classname, timeout_func, exception_handler, record,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add type hints here too?

self._async_function_runner.stop()
self._async_function_runner = None

def process_element(self, windowed_value, element):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder what will happen if the element is a watermark record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the watermark and element are sent together, the structure of element is as following:

CURRENT_TIMESTAMP, CURRENT_WATERMARK, NORMAL_DATA

watermark = element[1]
record = element[2]

self._queue.advance_watermark(watermark)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my own curiosity, what is the reason to advance watermark before the entry (the line below)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the data is behind the watermark. You can refer to ExternalPythonProcessOperator for more details.

return

self._exception_handler(
Exception("Could not complete the element:" + str(self._record), error))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if it is possible to have a test case to trigger this exception?

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 25, 2025
@dianfu
Copy link
Contributor Author

dianfu commented Oct 27, 2025

@charlesdong1991 Thanks for the review! Have updated the PR accordingly ~

Copy link
Contributor

@bgeng777 bgeng777 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! leave some comments


def __init__(self, capacity):
self._incomplete_elements = set()
self._complete_elements = collections.deque(maxlen=capacity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe what we mean here is "completed" (i.e. something is finished) instead of "complete"(i.e. entire)

if not isinstance(result, Iterable):
raise RuntimeError("The 'result_future' of AsyncFunction should be completed with "
"data of list type, please check the methods 'async_invoke' and "
"'timeout' of class '%s'." % self._classname)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we can move 'RuntimeError' int the previous "else" instead of doing another if not isinstance(result, Iterable) again, Like

        if not isinstance(result, Iterable):
            # complete with empty result, so that we remove timer and move ahead processing
            self._process_results([])
            raise RuntimeError("The 'result_future' of AsyncFunction should be completed with "
                               "data of list type, please check the methods 'async_invoke' and "
                               "'timeout' of class '%s'." % self._classname)
        else:
            self._process_results(result)


def stop(self):
if self._loop is not None:
self._loop.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do self._loop.close() to free underlying resources?

Suggested change
self._loop.stop()
self._loop.stop()
self._loop.close()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added self._loop.close() in method run.


def run_async(self, async_function, *arg):
wrapped_function = self.exception_handler_wrapper(async_function, *arg)
asyncio.run_coroutine_threadsafe(wrapped_function, self._loop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make sure I understand the error handling here correctly so I try to summarize the error handling process with a simple case:

  1. there are 3 records that should be processed by AsyncOperation'sprocess_element.
  2. self._async_function_runner.run_async is called for element1 in main thread.
  3. self._async_function_runner.run_async is called for element2 in main thread but the task has not been submitted to theloop thread(i.e. line 172 has not been executed).
  4. theloop thread executes the processing of element1, then exception happens, AsyncOperation marks exception.
  5. AsyncOperation in main threads submits the element2 to loop thread.
  6. AsyncOperation in main thread finds exception when processing element3 and raises. After some population, the main thread calls AsyncOperation's close() and stops the loop thread.
    According to the doc of async io:
image if the `loop` thread has not process element2's task, then the thread just stops and no extra processing for element2. If the `loop` thread is processing element2, the `loop` thread will finish the processing. But as AsyncOperation has been closed, is it possible that the `loop` thread runs into some bad cases(e.g. if the async function uses some freed resources setup in advance)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your understanding is correct. I make a minor update to reflect this: moving self.close_func() to the end of the method close.

if output_mode == AsyncFunctionDescriptor.OutputMode.UNORDERED:
self._queue = UnorderedStreamElementQueue(capacity, self._raise_exception_if_exists)
else:
raise NotImplementedError()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise NotImplementedError()
raise NotImplementedError("ORDERED mode is supported.")


if self._async_function_runner is not None:
self._async_function_runner.stop()
self._async_function_runner = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need self._exception = None here?
In other words, when failover happens, will we reuse AsyncOperation object or just create a new one?

@dianfu
Copy link
Contributor Author

dianfu commented Oct 28, 2025

@bgeng777 Thanks for the review. Updated the PR~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants