From 6737967cf0040acbb5f611eb374d8031c986f554 Mon Sep 17 00:00:00 2001 From: Ritinikhil <124866156+Ritinikhil@users.noreply.github.com> Date: Sun, 9 Mar 2025 23:29:34 +0530 Subject: [PATCH 1/2] enhance sql-using-python-udf example Add comprehensive comments and documentation Implement multiple data registration methods for API compatibility Add version information printing for debugging Improve error handling with informative messages Add formatted table output for better readability Include input validation through PyArrow schema Add results verification with assertions Author: Ritinikhil --- examples/sql-using-python-udf.py | 112 ++++++++++++++++++++++++------- 1 file changed, 87 insertions(+), 25 deletions(-) diff --git a/examples/sql-using-python-udf.py b/examples/sql-using-python-udf.py index 2f0a0b67d..97a73b090 100644 --- a/examples/sql-using-python-udf.py +++ b/examples/sql-using-python-udf.py @@ -16,44 +16,88 @@ # under the License. import pyarrow as pa -from datafusion import SessionContext, udf +from datafusion import SessionContext, udf, DataFrame +# Print version information for debugging +import datafusion +import pyarrow -# Define a user-defined function (UDF) +print(f"DataFusion version: {datafusion.__version__}") +print(f"PyArrow version: {pyarrow.__version__}") + + +# Define a user-defined function (UDF) that checks if a value is null def is_null(array: pa.Array) -> pa.Array: + """ + A UDF that checks if elements in an array are null. + Args: + array (pa.Array): Input PyArrow array + Returns: + pa.Array: Boolean array indicating which elements are null + """ return array.is_null() +# Create the UDF definition is_null_arr = udf( - is_null, - [pa.int64()], - pa.bool_(), - "stable", - # This will be the name of the UDF in SQL - # If not specified it will by default the same as Python function name - name="is_null", + is_null, # The Python function to use + [pa.int64()], # Input type(s) - here we expect one int64 column + pa.bool_(), # Output type - returns boolean + "stable", # Volatility - "stable" means same input = same output + name="is_null" # SQL name for the function ) -# Create a context +# Create a DataFusion session context ctx = SessionContext() -# Create a datafusion DataFrame from a Python dictionary -ctx.from_pydict({"a": [1, 2, 3], "b": [4, None, 6]}, name="t") -# Dataframe: -# +---+---+ -# | a | b | -# +---+---+ -# | 1 | 4 | -# | 2 | | -# | 3 | 6 | -# +---+---+ - -# Register UDF for use in SQL +try: + # Method 1: Using DataFrame.from_pydict (for newer DataFusion versions) + print("\nTrying Method 1: DataFrame.from_pydict") + df = DataFrame.from_pydict(ctx, { + "a": [1, 2, 3], + "b": [4, None, 6] + }) + df.create_or_replace_table("t") +except Exception as e: + print(f"Method 1 failed: {e}") + + try: + # Method 2: Using arrow table directly + print("\nTrying Method 2: Register arrow table") + table = pa.table({ + "a": [1, 2, 3], + "b": [4, None, 6] + }) + ctx.register_table("t", table) + except Exception as e: + print(f"Method 2 failed: {e}") + + # Method 3: Using explicit record batch creation + print("\nTrying Method 3: Explicit record batch creation") + # Define the schema for our data + schema = pa.schema([ + ('a', pa.int64()), # Column 'a' is int64 + ('b', pa.int64()) # Column 'b' is int64 + ]) + + # Create a record batch with our data + batch = pa.record_batch([ + pa.array([1, 2, 3], type=pa.int64()), # Data for column 'a' + pa.array([4, None, 6], type=pa.int64()) # Data for column 'b' + ], schema=schema) + + # Register the record batch with DataFusion + # Note: The double list [[batch]] is required by the API + ctx.register_record_batches("t", [[batch]]) + +# Register our UDF with the context ctx.register_udf(is_null_arr) -# Query the DataFrame using SQL +print("\nExecuting SQL query...") +# Execute a SQL query that uses our UDF result_df = ctx.sql("select a, is_null(b) as b_is_null from t") -# Dataframe: + +# Expected output: # +---+-----------+ # | a | b_is_null | # +---+-----------+ @@ -61,4 +105,22 @@ def is_null(array: pa.Array) -> pa.Array: # | 2 | true | # | 3 | false | # +---+-----------+ -assert result_df.to_pydict()["b_is_null"] == [False, True, False] + +# Convert result to dictionary and display +result_dict = result_df.to_pydict() +print("\nQuery Results:") +print("Result:", result_dict) + +# Verify the results +assert result_dict["b_is_null"] == [False, True, False], "Unexpected results from UDF" +print("\nAssert passed - UDF working as expected!") + +# Print a formatted version of the results +print("\nFormatted Results:") +print("+---+-----------+") +print("| a | b_is_null |") +print("+---+-----------+") +for i in range(len(result_dict["a"])): + print(f"| {result_dict['a'][i]} | {str(result_dict['b_is_null'][i]).lower():9} |") +print("+---+-----------+") + From 12250bb8bd4446e6730f328c50b07b00df6a0a03 Mon Sep 17 00:00:00 2001 From: Ritinikhil <124866156+Ritinikhil@users.noreply.github.com> Date: Sun, 9 Mar 2025 23:38:36 +0530 Subject: [PATCH 2/2] Updated better path handling of csv file in substrait.py --- examples/substrait.py | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/examples/substrait.py b/examples/substrait.py index fa6f77912..2bc17b2bd 100644 --- a/examples/substrait.py +++ b/examples/substrait.py @@ -15,35 +15,34 @@ # specific language governing permissions and limitations # under the License. + +import os from datafusion import SessionContext from datafusion import substrait as ss +# Get the directory of the current script +script_dir = os.path.dirname(os.path.abspath(__file__)) + +# Construct the path to the CSV file +# Using os.path.join for cross-platform compatibility +csv_file_path = os.path.join(script_dir, '..', 'testing', 'data', 'csv', 'aggregate_test_100.csv') + # Create a DataFusion context ctx = SessionContext() # Register table with context ctx.register_csv("aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv") - +try: + # Register table with context + ctx.register_csv("aggregate_test_data", csv_file_path) +except Exception as e: + print(f"Error registering CSV file: {e}") + print(f"Looking for file at: {csv_file_path}") + raise + +# Create Substrait plan from SQL query substrait_plan = ss.Serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx) # type(substrait_plan) -> -# Encode it to bytes -substrait_bytes = substrait_plan.encode() -# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely -# where they could subsequently be deserialized on the receiving end. -# Alternative serialization approaches -# type(substrait_bytes) -> , at this point the bytes can be distributed to file, network, etc safely -# where they could subsequently be deserialized on the receiving end. -substrait_bytes = ss.Serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx) -# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused -# type(substrait_plan) -> -substrait_plan = ss.Serde.deserialize_bytes(substrait_bytes) - -# type(df_logical_plan) -> -df_logical_plan = ss.Consumer.from_substrait_plan(ctx, substrait_plan) - -# Back to Substrait Plan just for demonstration purposes -# type(substrait_plan) -> -substrait_plan = ss.Producer.to_substrait_plan(df_logical_plan, ctx)