Skip to content

Python wrapper classes for all user interfaces #750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 55 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
d00c00a
Expose missing functions to python
timsaucer Jul 9, 2024
27e4f30
Initial commit for creating wrapper classes and functions for all use…
timsaucer Jul 9, 2024
a3429ab
Remove extra level of python path that is no longer required
timsaucer Jul 9, 2024
7937963
Move import to only happen for type checking for hints
timsaucer Jul 9, 2024
1f4c829
Comment out classes from __all__ in the top level that are not curren…
timsaucer Jul 9, 2024
d7f5f68
Add license comments
timsaucer Jul 9, 2024
79bb196
Add missing import
timsaucer Jul 9, 2024
685a257
Functions now only has one level of depth
timsaucer Jul 9, 2024
45ee5ab
Applying google docstring formatting
timsaucer Jul 9, 2024
b8239e7
Addressing PR request to add google formatted docstrings
timsaucer Jul 10, 2024
4c8073e
Small docstring for ruff
timsaucer Jul 10, 2024
411c91c
Linting
timsaucer Jul 10, 2024
610adda
Add docstring format checking to pre-commit stage
timsaucer Jul 10, 2024
265aeb7
Set explicit return types on UDFs
timsaucer Jul 11, 2024
02564de
Add options of passing either a path or a string
timsaucer Jul 12, 2024
e0e55a8
Switch to google docstring style
timsaucer Jul 12, 2024
dcd5211
Update unit tests to include registering via path or string
timsaucer Jul 12, 2024
1063cff
Add py.typed file
timsaucer Jul 12, 2024
5ba2017
Resolve deprecation warnings in unit tests
timsaucer Jul 13, 2024
438afa0
Add path to unit test
timsaucer Jul 13, 2024
837e3b2
Expose an option in write_csv to include header and add unit test
timsaucer Jul 13, 2024
6e75eee
Update write_parquet unit test to include paths or strings
timsaucer Jul 13, 2024
2ebe2e5
Add unit test for write_json
timsaucer Jul 13, 2024
dad0d26
Add unit test for substrait serialization to a file
timsaucer Jul 13, 2024
ae569ff
Add unit tests for runtime config
timsaucer Jul 13, 2024
4f973af
Setting return type to typing_extensions.Self per PR recommendation
timsaucer Jul 13, 2024
f2ed822
Correcting __next__ to not return None since it will raise an excepti…
timsaucer Jul 13, 2024
c2ee65d
Add optiona parameter of decimal places to round and add unit test
timsaucer Jul 13, 2024
835e374
Improve docstrings
timsaucer Jul 13, 2024
08b83ac
Set default to None instead of empty dict
timsaucer Jul 13, 2024
2ccd5ad
User request to allow passing multiple arguments to filter()
timsaucer Jul 13, 2024
13be857
Enhance Expr comparison operators to accept any python value and atte…
timsaucer Jul 13, 2024
8f1bb65
Expose overlay and add unit test
timsaucer Jul 13, 2024
75e129a
Allow select() to take either str for column names or a full expr
timsaucer Jul 13, 2024
f2b15e0
Update comments on regexp and add unit tests
timsaucer Jul 13, 2024
b76d105
Remove TODO markings no longer applicable
timsaucer Jul 13, 2024
6e87d73
Update udf documentation
timsaucer Jul 14, 2024
39f18cb
Docstring formatting
timsaucer Jul 14, 2024
94650b5
Updating docstring formatting
timsaucer Jul 14, 2024
95a4688
Updating docstring formatting
timsaucer Jul 14, 2024
39d9c00
Updating docstring formatting
timsaucer Jul 14, 2024
671d508
Updating docstring formatting
timsaucer Jul 15, 2024
49efdd0
Updating docstring formatting
timsaucer Jul 15, 2024
3c7a811
Cleaning up docstring line lengths
timsaucer Jul 15, 2024
fbf3f46
Add pre-commit check of docstring line length
timsaucer Jul 15, 2024
d6c6598
Do not emit doc entry for __init__ of some classes
timsaucer Jul 16, 2024
cccf305
Correct errors on code blocks generating in sphinx
timsaucer Jul 16, 2024
6579ac5
Resolve conflict with
timsaucer Jul 16, 2024
62197bc
Add license info to py.typed
timsaucer Jul 16, 2024
2821183
Clean up some docstring too long errors in CI
timsaucer Jul 16, 2024
c1df7db
Correct ruff complain in unit tests
timsaucer Jul 16, 2024
461e7b5
Temporarily install google test to get clippy to pass
timsaucer Jul 16, 2024
4af541e
Adding gmock to build step due to upstream error
timsaucer Jul 16, 2024
5588f28
Add type_extensions to conda meta file
timsaucer Jul 16, 2024
39f01fb
Small comment suggestions from PR
timsaucer Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions examples/substrait.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
from datafusion import SessionContext
from datafusion import substrait as ss

# TODO add user changing interface note to PR that datafusion.substrait.substrait is simplified to datafusion.substrait

# Create a DataFusion context
ctx = SessionContext()

# Register table with context
ctx.register_csv("aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv")

substrait_plan = ss.substrait.serde.serialize_to_plan(
"SELECT * FROM aggregate_test_data", ctx
)
substrait_plan = ss.serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx)
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>

# Encode it to bytes
Expand All @@ -38,17 +37,15 @@
# Alternative serialization approaches
# type(substrait_bytes) -> <class 'bytes'>, at this point the bytes can be distributed to file, network, etc safely
# where they could subsequently be deserialized on the receiving end.
substrait_bytes = ss.substrait.serde.serialize_bytes(
"SELECT * FROM aggregate_test_data", ctx
)
substrait_bytes = ss.serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx)

# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
substrait_plan = ss.serde.deserialize_bytes(substrait_bytes)

# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan)
df_logical_plan = ss.consumer.from_substrait_plan(ctx, substrait_plan)

# Back to Substrait Plan just for demonstration purposes
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
substrait_plan = ss.producer.to_substrait_plan(df_logical_plan)
194 changes: 97 additions & 97 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,64 +25,67 @@

import pyarrow as pa

from ._internal import (
AggregateUDF,
Config,
DataFrame,
from .context import (
SessionContext,
SessionConfig,
RuntimeConfig,
ScalarUDF,
SQLOptions,
)

# The following imports are okay to remain as opaque to the user.
from ._internal import Config

from .udf import ScalarUDF, AggregateUDF

from .common import (
DFSchema,
)

from .dataframe import DataFrame

from .expr import (
Alias,
Analyze,
# Alias,
# Analyze,
Expr,
Filter,
Limit,
Like,
ILike,
Projection,
SimilarTo,
ScalarVariable,
Sort,
TableScan,
Not,
IsNotNull,
IsTrue,
IsFalse,
IsUnknown,
IsNotTrue,
IsNotFalse,
IsNotUnknown,
Negative,
InList,
Exists,
Subquery,
InSubquery,
ScalarSubquery,
GroupingSet,
Placeholder,
Case,
Cast,
TryCast,
Between,
Explain,
CreateMemoryTable,
SubqueryAlias,
Extension,
CreateView,
Distinct,
DropTable,
Repartition,
Partitioning,
Window,
# Filter,
# Limit,
# Like,
# ILike,
# Projection,
# SimilarTo,
# ScalarVariable,
# Sort,
# TableScan,
# Not,
# IsNotNull,
# IsTrue,
# IsFalse,
# IsUnknown,
# IsNotTrue,
# IsNotFalse,
# IsNotUnknown,
# Negative,
# InList,
# Exists,
# Subquery,
# InSubquery,
# ScalarSubquery,
# GroupingSet,
# Placeholder,
# Case,
# Cast,
# TryCast,
# Between,
# Explain,
# CreateMemoryTable,
# SubqueryAlias,
# Extension,
# CreateView,
# Distinct,
# DropTable,
# Repartition,
# Partitioning,
# Window,
WindowFrame,
)

Expand All @@ -96,56 +99,55 @@
"SQLOptions",
"RuntimeConfig",
"Expr",
"AggregateUDF",
"ScalarUDF",
"Window",
# "Window",
"WindowFrame",
"column",
"literal",
"TableScan",
"Projection",
# "TableScan",
# "Projection",
"DFSchema",
"DFField",
"Analyze",
"Sort",
"Limit",
"Filter",
"Like",
"ILike",
"SimilarTo",
"ScalarVariable",
"Alias",
"Not",
"IsNotNull",
"IsTrue",
"IsFalse",
"IsUnknown",
"IsNotTrue",
"IsNotFalse",
"IsNotUnknown",
"Negative",
"ScalarFunction",
"BuiltinScalarFunction",
"InList",
"Exists",
"Subquery",
"InSubquery",
"ScalarSubquery",
"GroupingSet",
"Placeholder",
"Case",
"Cast",
"TryCast",
"Between",
"Explain",
"SubqueryAlias",
"Extension",
"CreateMemoryTable",
"CreateView",
"Distinct",
"DropTable",
"Repartition",
"Partitioning",
# "DFField",
# "Analyze",
# "Sort",
# "Limit",
# "Filter",
# "Like",
# "ILike",
# "SimilarTo",
# "ScalarVariable",
# "Alias",
# "Not",
# "IsNotNull",
# "IsTrue",
# "IsFalse",
# "IsUnknown",
# "IsNotTrue",
# "IsNotFalse",
# "IsNotUnknown",
# "Negative",
# "ScalarFunction",
# "BuiltinScalarFunction",
# "InList",
# "Exists",
# "Subquery",
# "InSubquery",
# "ScalarSubquery",
# "GroupingSet",
# "Placeholder",
# "Case",
# "Cast",
# "TryCast",
# "Between",
# "Explain",
# "SubqueryAlias",
# "Extension",
# "CreateMemoryTable",
# "CreateView",
# "Distinct",
# "DropTable",
# "Repartition",
# "Partitioning",
]


Expand Down Expand Up @@ -175,8 +177,6 @@ def column(value):


def literal(value):
if not isinstance(value, pa.Scalar):
value = pa.scalar(value)
return Expr.literal(value)


Expand All @@ -200,20 +200,20 @@ def udf(func, input_types, return_type, volatility, name=None):
)


def udaf(accum, input_type, return_type, state_type, volatility, name=None):
def udaf(accum, input_types, return_type, state_type, volatility, name=None):
"""
Create a new User Defined Aggregate Function
"""
if not issubclass(accum, Accumulator):
raise TypeError("`accum` must implement the abstract base class Accumulator")
if name is None:
name = accum.__qualname__.lower()
if isinstance(input_type, pa.lib.DataType):
input_type = [input_type]
if isinstance(input_types, pa.lib.DataType):
input_types = [input_types]
return AggregateUDF(
name=name,
accumulator=accum,
input_type=input_type,
input_types=input_types,
return_type=return_type,
state_type=state_type,
volatility=volatility,
Expand Down
59 changes: 59 additions & 0 deletions python/datafusion/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import datafusion._internal as df_internal

from typing import TYPE_CHECKING

if TYPE_CHECKING:
import pyarrow


class Catalog:
def __init__(self, catalog: df_internal.Catalog) -> None:
self.catalog = catalog

def names(self) -> list[str]:
return self.catalog.names()

def database(self, name: str = "public") -> Database:
return Database(self.catalog.database(name))


class Database:
def __init__(self, db: df_internal.Database) -> None:
self.db = db

def names(self) -> set[str]:
return self.db.names()

def table(self, name: str) -> Table:
return Table(self.db.table(name))


class Table:
def __init__(self, table: df_internal.Table) -> None:
self.table = table

def schema(self) -> pyarrow.Schema:
return self.table.schema()

@property
def kind(self) -> str:
return self.table.kind()
Loading
Loading