Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions flink-python/pyflink/datastream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@
- :class:`OutputTag`:
Tag with a name and type for identifying side output of an operator
"""
from pyflink.datastream.async_data_stream import AsyncDataStream
from pyflink.datastream.checkpoint_config import CheckpointConfig
from pyflink.datastream.externalized_checkpoint_retention import ExternalizedCheckpointRetention
from pyflink.datastream.checkpointing_mode import CheckpointingMode
Expand All @@ -268,7 +269,8 @@
SinkFunction, CoProcessFunction, KeyedProcessFunction,
KeyedCoProcessFunction, AggregateFunction, WindowFunction,
ProcessWindowFunction, BroadcastProcessFunction,
KeyedBroadcastProcessFunction)
KeyedBroadcastProcessFunction, AsyncFunction,
ResultFuture)
from pyflink.datastream.slot_sharing_group import SlotSharingGroup, MemorySize
from pyflink.datastream.state_backend import (StateBackend, CustomStateBackend,
PredefinedOptions, HashMapStateBackend,
Expand All @@ -292,6 +294,7 @@
'ConnectedStreams',
'BroadcastStream',
'BroadcastConnectedStream',
'AsyncDataStream',
'DataStreamSink',
'MapFunction',
'CoMapFunction',
Expand All @@ -308,6 +311,7 @@
'AggregateFunction',
'BroadcastProcessFunction',
'KeyedBroadcastProcessFunction',
'AsyncFunction',
'RuntimeContext',
'TimerService',
'CheckpointingMode',
Expand Down Expand Up @@ -338,5 +342,6 @@
'SinkFunction',
'SlotSharingGroup',
'MemorySize',
'OutputTag'
'OutputTag',
'ResultFuture'
]
77 changes: 77 additions & 0 deletions flink-python/pyflink/datastream/async_data_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import inspect

from pyflink.common import Time, TypeInformation
from pyflink.datastream.data_stream import DataStream, _get_one_input_stream_operator
from pyflink.datastream.functions import AsyncFunctionDescriptor, AsyncFunction
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import get_j_env_configuration


class AsyncDataStream(object):
"""
A helper class to apply :class:`~AsyncFunction` to a data stream.
"""

@staticmethod
def unordered_wait(
data_stream: DataStream,
async_function: AsyncFunction,
timeout: Time,
capacity: int = 100,
output_type: TypeInformation = None) -> 'DataStream':
"""
Adds an async function to the data stream. The order of output stream records may be
reordered.

:param data_stream: The input data stream.
:param async_function: The async function.
:param timeout: The timeout for the asynchronous operation to complete.
:param capacity: The max number of async i/o operation that can be triggered.
:param output_type: The output data type.
:return: The transformed DataStream.
"""
AsyncDataStream._validate(data_stream, async_function)

from pyflink.fn_execution import flink_fn_execution_pb2
j_python_data_stream_function_operator, j_output_type_info = \
_get_one_input_stream_operator(
data_stream,
AsyncFunctionDescriptor(
async_function, timeout, capacity,
AsyncFunctionDescriptor.OutputMode.UNORDERED),
flink_fn_execution_pb2.UserDefinedDataStreamFunction.PROCESS, # type: ignore
output_type)
return DataStream(data_stream._j_data_stream.transform(
"async wait operator",
j_output_type_info,
j_python_data_stream_function_operator))

@staticmethod
def _validate(data_stream: DataStream, async_function: AsyncFunction):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _validate(data_stream: DataStream, async_function: AsyncFunction):
def _validate(data_stream: DataStream, async_function: AsyncFunction) -> None:

if not inspect.iscoroutinefunction(async_function.async_invoke):
raise Exception("Method 'async_invoke' of class '%s' should be declared as 'async def'."
% type(async_function))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
% type(async_function))
% type(async_function).__name__)

i think type(function) will return a verbose result, maybe this change gives actual name

Copy link
Contributor Author

@dianfu dianfu Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type(function) returns the full path of the class name and type(async_function).__name__ returns just the class name. My intention here is to print the full path which I think will be more helpful.


gateway = get_gateway()
j_conf = get_j_env_configuration(data_stream._j_data_stream.getExecutionEnvironment())
python_execution_mode = (
j_conf.get(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE))
if python_execution_mode == 'thread':
raise Exception("AsyncFunction is still not supported for 'thread' mode.")
9 changes: 6 additions & 3 deletions flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
KeyedBroadcastProcessFunction,
InternalSingleValueAllWindowFunction,
PassThroughAllWindowFunction,
InternalSingleValueProcessAllWindowFunction)
InternalSingleValueProcessAllWindowFunction,
AsyncFunctionDescriptor)
from pyflink.datastream.output_tag import OutputTag
from pyflink.datastream.slot_sharing_group import SlotSharingGroup
from pyflink.datastream.state import (ListStateDescriptor, StateDescriptor, ReducingStateDescriptor,
Expand Down Expand Up @@ -2757,7 +2758,8 @@ def _is_keyed_stream(self):
def _get_one_input_stream_operator(data_stream: DataStream,
func: Union[Function,
FunctionWrapper,
WindowOperationDescriptor],
WindowOperationDescriptor,
AsyncFunctionDescriptor],
func_type: int,
output_type: Union[TypeInformation, List] = None):
"""
Expand Down Expand Up @@ -2891,7 +2893,8 @@ def _get_two_input_stream_operator(connected_streams: ConnectedStreams,


def _create_j_data_stream_python_function_info(
func: Union[Function, FunctionWrapper, WindowOperationDescriptor], func_type: int
func: Union[Function, FunctionWrapper, WindowOperationDescriptor, AsyncFunctionDescriptor],
func_type: int
) -> bytes:
gateway = get_gateway()

Expand Down
84 changes: 83 additions & 1 deletion flink-python/pyflink/datastream/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
################################################################################

from abc import ABC, abstractmethod
from enum import Enum

from py4j.java_gateway import JavaObject
from typing import Union, Any, Generic, TypeVar, Iterable
from typing import Union, Any, Generic, TypeVar, Iterable, List

from pyflink.datastream.state import ValueState, ValueStateDescriptor, ListStateDescriptor, \
ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, ReducingState, \
Expand Down Expand Up @@ -53,6 +55,9 @@
'BaseBroadcastProcessFunction',
'BroadcastProcessFunction',
'KeyedBroadcastProcessFunction',
'AsyncFunction',
'AsyncFunctionDescriptor',
'ResultFuture'
]


Expand Down Expand Up @@ -897,6 +902,83 @@ def on_timer(self, timestamp: int, ctx: 'KeyedCoProcessFunction.OnTimerContext')
pass


class ResultFuture(Generic[OUT]):
"""
Collects data / error in user codes while processing async i/o.
"""

@abstractmethod
def complete(self, result: List[OUT]):
"""
Completes the result future with a collection of result objects.

Note that it should be called for exactly one time in the user code. Calling this function
for multiple times will cause data lose.

Put all results in a collection and then emit output.

:param result: A list of results.
"""
pass

@abstractmethod
def complete_exceptionally(self, error: Exception):
"""
Completes the result future exceptionally with an exception.

:param error: An Exception object.
"""
pass


class AsyncFunction(Function, Generic[IN, OUT]):
"""
A function to trigger Async I/O operation.

For each #async_invoke, an async io operation can be triggered, and once it has been done, the
result can be collected by calling :func:`~ResultFuture.complete`. For each async operation, its
context is stored in the operator immediately after invoking #async_invoke, avoiding blocking
for each stream input as long as the internal buffer is not full.

:class:`~ResultFuture` can be passed into callbacks or futures to collect the result data. An
error can also be propagated to the async IO operator by
:func:`~ResultFuture.complete_exceptionally`.
"""

@abstractmethod
async def async_invoke(self, value: IN, result_future: ResultFuture[OUT]):
"""
Trigger async operation for each stream input.
In case of a user code error. You can raise an exception to make the task fail and
trigger fail-over process.

:param value: Input element coming from an upstream task.
:param result_future: A future to be completed with the result data.
"""
pass

def timeout(self, value: IN, result_future: ResultFuture[OUT]):
"""
In case :func:`~ResultFuture.async_invoke` timeout occurred. By default, the result future
is exceptionally completed with a timeout exception.
"""
result_future.complete_exceptionally(
TimeoutError("Async function call has timed out for input: " + str(value)))


class AsyncFunctionDescriptor(object):

class OutputMode(Enum):
ORDERED = 0
UNORDERED = 1

def __init__(self, async_function, timeout, capacity, output_mode):
self.async_function = async_function
self.timeout = timeout
self.capacity = capacity
self.output_mode = output_mode


class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
"""
Base interface for functions that are evaluated over keyed (grouped) windows.
Expand Down
Loading