Skip to content

[DRAFT][PYTHON] Improve Python UDF Arrow Serializer Performance #51225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 40 commits into
base: master
Choose a base branch
from

Conversation

asl3
Copy link
Contributor

@asl3 asl3 commented Jun 19, 2025

What changes were proposed in this pull request?

This PR removes pandas <> Arrow <> pandas conversion in Arrow-optimized Python UDF by directly using PyArrow.

Why are the changes needed?

Python UDF arrow serializer has a lot of overhead from converting arrow batches into pandas series and converting UDF results back to a pandas dataframe.

We can instead convert Python object directly into arrow to avoid the expensive pandas conversion.

Does this PR introduce any user-facing change?

Legacy type coercion (arrow batch eval)

# +-----------------------------+--------------+------------------+--------------------+------+--------------------+-----------------------------+------------+----------------------+------------------+------------------+----------------------------+--------------------+--------------+  # noqa
    # |SQL Type \ Python Value(Type)|None(NoneType)|        True(bool)|              1(int)|a(str)|    1970-01-01(date)|1970-01-01 00:00:00(datetime)|  1.0(float)|array('i', [1])(array)|         [1](list)|       (1,)(tuple)|bytearray(b'ABC')(bytearray)|          1(Decimal)|{'a': 1}(dict)|  # noqa
    # +-----------------------------+--------------+------------------+--------------------+------+--------------------+-----------------------------+------------+----------------------+------------------+------------------+----------------------------+--------------------+--------------+  # noqa
    # |                      boolean|          None|              True|                True|     X|                   X|                            X|        True|                     X|                 X|                 X|                           X|                   X|             X|  # noqa
    # |                      tinyint|          None|                 1|                   1|     X|                   X|                            X|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                     smallint|          None|                 1|                   1|     X|                   X|                            X|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                          int|          None|                 1|                   1|     X|                   0|                            X|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                       bigint|          None|                 1|                   1|     X|                   X|                            0|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                       string|          None|            'True'|                 '1'|   'a'|        '1970-01-01'|         '1970-01-01 00:00...|       '1.0'|     "array('i', [1])"|             '[1]'|            '(1,)'|         "bytearray(b'ABC')"|                 '1'|    "{'a': 1}"|  # noqa
    # |                         date|          None|                 X|                   X|     X|datetime.date(197...|         datetime.date(197...|           X|                     X|                 X|                 X|                           X|datetime.date(197...|             X|  # noqa
    # |                    timestamp|          None|                 X|datetime.datetime...|     X|                   X|         datetime.datetime...|           X|                     X|                 X|                 X|                           X|datetime.datetime...|             X|  # noqa
    # |                        float|          None|               1.0|                 1.0|     X|                   X|                            X|         1.0|                     X|                 X|                 X|                           X|                 1.0|             X|  # noqa
    # |                       double|          None|               1.0|                 1.0|     X|                   X|                            X|         1.0|                     X|                 X|                 X|                           X|                 1.0|             X|  # noqa
    # |                       binary|          None|bytearray(b'\x00')|  bytearray(b'\x00')|     X|                   X|                            X|           X|  bytearray(b'\x01\...|bytearray(b'\x01')|bytearray(b'\x01')|           bytearray(b'ABC')|                   X|             X|  # noqa
    # |                decimal(10,0)|          None|                 X|                   X|     X|                   X|                            X|Decimal('1')|                     X|                 X|                 X|                           X|        Decimal('1')|             X|  # noqa
    # +-----------------------------+--------------+------------------+--------------------+------+--------------------+-----------------------------+------------+----------------------+------------------+------------------+----------------------------+--------------------+--------------+  # noqa

New type coercion (arrow batch eval):

# +-----------------------------+--------------+------------------+--------------------+------+--------------------+-----------------------------+------------+----------------------+------------------+------------------+----------------------------+--------------------+--------------+  # noqa
    # |SQL Type \ Python Value(Type)|None(NoneType)|        True(bool)|              1(int)|a(str)|    1970-01-01(date)|1970-01-01 00:00:00(datetime)|  1.0(float)|array('i', [1])(array)|         [1](list)|       (1,)(tuple)|bytearray(b'ABC')(bytearray)|          1(Decimal)|{'a': 1}(dict)|  # noqa
    # +-----------------------------+--------------+------------------+--------------------+------+--------------------+-----------------------------+------------+----------------------+------------------+------------------+----------------------------+--------------------+--------------+  # noqa
    # |                      boolean|          None|              True|                True|     X|                   X|                            X|        True|                     X|                 X|                 X|                           X|                   X|             X|  # noqa
    # |                      tinyint|          None|                 1|                   1|     X|                   X|                            X|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                     smallint|          None|                 1|                   1|     X|                   X|                            X|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                          int|          None|                 1|                   1|     X|                   0|                            X|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                       bigint|          None|                 1|                   1|     X|                   X|                            0|           1|                     X|                 X|                 X|                           X|                   1|             X|  # noqa
    # |                       string|          None|            'True'|                 '1'|   'a'|        '1970-01-01'|         '1970-01-01 00:00...|       '1.0'|     "array('i', [1])"|             '[1]'|            '(1,)'|         "bytearray(b'ABC')"|                 '1'|    "{'a': 1}"|  # noqa
    # |                         date|          None|                 X|                   X|     X|datetime.date(197...|         datetime.date(197...|           X|                     X|                 X|                 X|                           X|datetime.date(197...|             X|  # noqa
    # |                    timestamp|          None|                 X|datetime.datetime...|     X|                   X|         datetime.datetime...|           X|                     X|                 X|                 X|                           X|datetime.datetime...|             X|  # noqa
    # |                        float|          None|               1.0|                 1.0|     X|                   X|                            X|         1.0|                     X|                 X|                 X|                           X|                 1.0|             X|  # noqa
    # |                       double|          None|               1.0|                 1.0|     X|                   X|                            X|         1.0|                     X|                 X|                 X|                           X|                 1.0|             X|  # noqa
    # |                       binary|          None|bytearray(b'\x00')|  bytearray(b'\x00')|     X|                   X|                            X|           X|  bytearray(b'\x01\...|bytearray(b'\x01')|bytearray(b'\x01')|           bytearray(b'ABC')|                   X|             X|  # noqa
    # |                decimal(10,0)|          None|                 X|                   X|     X|                   X|                            X|Decimal('1')|                     X|                 X|                 X|                           X|        Decimal('1')|             X|  # noqa
    # +-----------------------------+--------------+------------------+--------------------+------+--------------------+-----------------------------+------------+----------------------+------------------+------------------+----------------------------+--------------------+--------------+  # noqa

How was this patch tested?

Added tests for both the legacy and new codepath, for arrow-batch eval

Was this patch authored or co-authored using generative AI tooling?

No

@asl3 asl3 changed the title [DRAFT][PYTHON] Improve Python UDTF arrow serializer performance [DRAFT][PYTHON] Improve Python UDF arrow serializer performance Jun 19, 2025
@asl3 asl3 changed the title [DRAFT][PYTHON] Improve Python UDF arrow serializer performance [DRAFT][PYTHON] Improve Python UDF Arrow Serializer Performance Jun 19, 2025
@HyukjinKwon HyukjinKwon marked this pull request as draft June 23, 2025 02:52
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a cursory look, seems making sense

arrow_return_type = to_arrow_type(
return_type, prefers_large_types=use_large_var_types(runner_conf)
)
def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's get rid of those white spaces tho

def test_complex_input_types(self):
for pandas_conversion in [True, False]:
with self.subTest(pandas_conversion=pandas_conversion), self.sql_conf(
{"spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled": str(pandas_conversion).lower()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can move the config setting into the setupClass method

@unittest.skipIf(
not have_pandas or not have_pyarrow, pandas_requirement_message or pyarrow_requirement_message
)
class ArrowPythonUDFLegacyTestsMixin(BaseUDFTestsMixin):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: also add parity tests in pyspark.sql.tests.connect.arrow.test_parity_arrow_python_udf

elif isinstance(packed, list):
# multiple array UDFs in a projection
arrs = [self._create_array(t[0], t[1], self._arrow_cast) for t in packed]
elif isinstance(packed, tuple) and len(packed) == 3:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the conditions in arrow-opt UDF is more complicated.
In what case will this branch be chosen?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants