Skip to content

Commit 75060b4

Browse files
committed
chore: use execution mixin
1 parent b78b854 commit 75060b4

File tree

15 files changed

+132
-57
lines changed

15 files changed

+132
-57
lines changed

python/xorq/backends/__init__.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,85 @@
11
import functools
22
import importlib.metadata
3+
from abc import ABC
4+
from typing import Any, Mapping
5+
6+
from xorq.vendor.ibis import BaseBackend
7+
from xorq.vendor.ibis.expr import types as ir
8+
9+
10+
class ExecutionBackend(BaseBackend, ABC):
11+
def _pandas_execute(self, expr: ir.Expr, **kwargs):
12+
from xorq.expr.api import _transform_expr
13+
from xorq.expr.relations import FlightExpr, FlightUDXF
14+
15+
node = expr.op()
16+
if isinstance(node, (FlightExpr, FlightUDXF)):
17+
df = node.to_rbr().read_pandas(timestamp_as_object=True)
18+
return expr.__pandas_result__(df)
19+
(expr, created) = _transform_expr(expr)
20+
21+
return super().execute(expr, **kwargs)
22+
23+
def execute(self, expr, **kwargs) -> Any:
24+
if self.name == "pandas":
25+
return self._pandas_execute(expr, **kwargs)
26+
27+
batch_reader = self.to_pyarrow_batches(expr, **kwargs)
28+
df = batch_reader.read_pandas(timestamp_as_object=True)
29+
30+
return expr.__pandas_result__(df)
31+
32+
def to_pyarrow_batches(
33+
self,
34+
expr: ir.Expr,
35+
*,
36+
chunk_size: int = 1_000_000,
37+
**kwargs: Any,
38+
):
39+
from xorq.common.utils.defer_utils import rbr_wrapper
40+
from xorq.expr.api import _transform_expr
41+
from xorq.expr.relations import FlightExpr, FlightUDXF
42+
43+
if isinstance(expr.op(), (FlightExpr, FlightUDXF)):
44+
return expr.op().to_rbr()
45+
(expr, created) = _transform_expr(expr)
46+
reader = super().to_pyarrow_batches(expr, chunk_size=chunk_size, **kwargs)
47+
48+
def clean_up():
49+
for table_name, conn in created.items():
50+
try:
51+
conn.drop_table(table_name, force=True)
52+
except Exception:
53+
conn.drop_view(table_name)
54+
55+
return rbr_wrapper(reader, clean_up)
56+
57+
def _pandas_to_pyarrow(self, expr, **kwargs):
58+
from xorq.expr.api import _transform_expr
59+
from xorq.expr.relations import FlightExpr, FlightUDXF
60+
61+
node = expr.op()
62+
if isinstance(node, (FlightExpr, FlightUDXF)):
63+
df = node.to_rbr().read_pandas(timestamp_as_object=True)
64+
return expr.__pyarrow_result__(df)
65+
(expr, created) = _transform_expr(expr)
66+
67+
return super().to_pyarrow(expr, **kwargs)
68+
69+
def to_pyarrow(
70+
self,
71+
expr: ir.Expr,
72+
*,
73+
params: Mapping[ir.Scalar, Any] | None = None,
74+
limit: int | str | None = None,
75+
**kwargs: Any,
76+
):
77+
if self.name == "pandas":
78+
return self._pandas_to_pyarrow(expr, **kwargs)
79+
80+
batch_reader = self.to_pyarrow_batches(expr, **kwargs)
81+
arrow_table = batch_reader.read_all()
82+
return expr.__pyarrow_result__(arrow_table)
383

484

585
@functools.cache

python/xorq/backends/datafusion/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import xorq.vendor.ibis.expr.operations as ops
1212
import xorq.vendor.ibis.expr.schema as sch
1313
import xorq.vendor.ibis.expr.types as ir
14+
from xorq.backends import ExecutionBackend
1415
from xorq.vendor import ibis
1516
from xorq.vendor.ibis.backends.datafusion import Backend as IbisDatafusionBackend
1617
from xorq.vendor.ibis.common.dispatch import lazy_singledispatch
@@ -21,7 +22,7 @@
2122
import pandas as pd
2223

2324

24-
class Backend(IbisDatafusionBackend):
25+
class Backend(ExecutionBackend, IbisDatafusionBackend):
2526
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
2627
self.con.from_arrow(op.data.to_pyarrow(op.schema), op.name)
2728

python/xorq/backends/duckdb/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
import pyarrow as pa
44

5+
from xorq.backends import ExecutionBackend
56
from xorq.vendor.ibis.backends.duckdb import Backend as IbisDuckDBBackend
67
from xorq.vendor.ibis.expr import types as ir
78
from xorq.vendor.ibis.util import gen_name
89

910

10-
class Backend(IbisDuckDBBackend):
11+
class BaseExecutionBackend(IbisDuckDBBackend):
1112
def execute(
1213
self,
1314
expr: ir.Expr,
@@ -37,3 +38,7 @@ def to_pyarrow_batches(
3738
return self._to_duckdb_relation(
3839
expr, params=params, limit=limit
3940
).fetch_arrow_reader(chunk_size)
41+
42+
43+
class Backend(ExecutionBackend, BaseExecutionBackend):
44+
pass

python/xorq/backends/let/__init__.py

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pyarrow_hotfix # noqa: F401
99
from sqlglot import exp, parse_one
1010

11+
from xorq.backends import ExecutionBackend
1112
from xorq.backends.let.datafusion import Backend as DataFusionBackend
1213
from xorq.common.collections import SourceDict
1314
from xorq.internal import SessionConfig, WindowUDF
@@ -35,7 +36,7 @@ def _get_datafusion_dataframe(con, expr, **kwargs):
3536
return con.con.sql(raw_sql)
3637

3738

38-
class Backend(DataFusionBackend):
39+
class Backend(ExecutionBackend, DataFusionBackend):
3940
name = "let"
4041

4142
def __init__(self, *args, **kwargs):
@@ -158,42 +159,6 @@ def create_table(
158159
self._sources[registered_table.op()] = registered_table.op()
159160
return registered_table
160161

161-
def execute(self, expr: ir.Expr, **kwargs: Any):
162-
batch_reader = self.to_pyarrow_batches(expr, **kwargs)
163-
return expr.__pandas_result__(
164-
batch_reader.read_pandas(timestamp_as_object=True)
165-
)
166-
167-
def to_pyarrow(self, expr: ir.Expr, **kwargs: Any) -> pa.Table:
168-
batch_reader = self.to_pyarrow_batches(expr, **kwargs)
169-
arrow_table = batch_reader.read_all()
170-
return expr.__pyarrow_result__(arrow_table)
171-
172-
def to_pyarrow_batches(
173-
self,
174-
expr: ir.Expr,
175-
*,
176-
chunk_size: int = 1_000_000,
177-
**kwargs: Any,
178-
) -> pa.ipc.RecordBatchReader:
179-
return super().to_pyarrow_batches(expr, chunk_size=chunk_size, **kwargs)
180-
181-
def do_connect(self, config: SessionConfig | None = None) -> None:
182-
"""Creates a connection.
183-
184-
Parameters
185-
----------
186-
config
187-
Mapping of table names to files.
188-
189-
Examples
190-
--------
191-
>>> import xorq.api as xo
192-
>>> con = xo.connect()
193-
194-
"""
195-
super().do_connect(config=config)
196-
197162
def _to_sqlglot(
198163
self, expr: ir.Expr, *, limit: str | None = None, params=None, **_: Any
199164
):

python/xorq/backends/let/datafusion/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,7 @@ def to_pyarrow(self, expr: ir.Expr, **kwargs: Any) -> pa.Table:
759759
return expr.__pyarrow_result__(arrow_table)
760760

761761
def execute(self, expr: ir.Expr, **kwargs: Any):
762+
breakpoint()
762763
batch_reader = self._to_pyarrow_batches(expr, **kwargs)
763764
return expr.__pandas_result__(
764765
batch_reader.read_pandas(timestamp_as_object=True)

python/xorq/backends/pandas/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import xorq.vendor.ibis.expr.operations as ops
1313
import xorq.vendor.ibis.expr.schema as sch
1414
import xorq.vendor.ibis.expr.types as ir
15+
from xorq.backends import ExecutionBackend
1516
from xorq.vendor import ibis
1617
from xorq.vendor.ibis import util
1718
from xorq.vendor.ibis.backends import BaseBackend, NoUrl
@@ -302,7 +303,7 @@ def to_pyarrow_batches(
302303
)
303304

304305

305-
class Backend(BasePandasBackend):
306+
class BaseExecutionBackend(BasePandasBackend):
306307
name = "pandas"
307308

308309
def execute(self, query, params=None, limit="default", **kwargs):
@@ -354,6 +355,10 @@ def read_record_batches(
354355
return self.table(table_name)
355356

356357

358+
class Backend(ExecutionBackend, BaseExecutionBackend):
359+
name = "pandas"
360+
361+
357362
@lazy_singledispatch
358363
def _convert_object(obj: Any, _conn):
359364
raise com.BackendConversionError(

python/xorq/backends/postgres/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import toolz
99

1010
import xorq.vendor.ibis.expr.schema as sch
11+
from xorq.backends import ExecutionBackend
1112
from xorq.backends.postgres.compiler import compiler
1213
from xorq.common.utils.defer_utils import (
1314
read_csv_rbr,
@@ -21,7 +22,7 @@
2122
)
2223

2324

24-
class Backend(IbisPostgresBackend):
25+
class Backend(ExecutionBackend, IbisPostgresBackend):
2526
_top_level_methods = ("connect_examples", "connect_env")
2627
compiler = compiler
2728

python/xorq/backends/pyiceberg/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pyiceberg.table import Table as IcebergTable
99

1010
import xorq.vendor.ibis.expr.operations as ops
11+
from xorq.backends import ExecutionBackend
1112
from xorq.backends.postgres.compiler import compiler as postgres_compiler
1213
from xorq.backends.pyiceberg.compiler import PyIceberg, translate
1314
from xorq.backends.pyiceberg.relations import PyIcebergTable
@@ -50,7 +51,7 @@ def _overwrite_table_data(iceberg_table: IcebergTable, data: pa.Table):
5051
tx.commit_transaction()
5152

5253

53-
class Backend(SQLBackend):
54+
class BaseExecutionBackend(SQLBackend):
5455
name = "pyiceberg"
5556
dialect = PyIceberg
5657
compiler = postgres_compiler
@@ -317,3 +318,7 @@ def list_snapshots(self, database=None) -> dict[str, int]:
317318
)
318319

319320
return snapshots
321+
322+
323+
class Backend(ExecutionBackend, BaseExecutionBackend):
324+
pass

python/xorq/backends/snowflake/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import xorq.vendor.ibis.expr.api as api
1212
import xorq.vendor.ibis.expr.schema as sch
1313
import xorq.vendor.ibis.expr.types as ir
14+
from xorq.backends import ExecutionBackend
1415
from xorq.common.utils.logging_utils import get_logger
1516
from xorq.expr.relations import replace_cache_table
1617
from xorq.vendor.ibis.backends.snowflake import _SNOWFLAKE_MAP_UDFS
@@ -23,7 +24,7 @@
2324
logger = get_logger(__name__)
2425

2526

26-
class Backend(IbisSnowflakeBackend):
27+
class Backend(ExecutionBackend, IbisSnowflakeBackend):
2728
_top_level_methods = ("connect_env",)
2829

2930
@classmethod

python/xorq/backends/sqlite/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import sqlglot as sg
77
import sqlglot.expressions as sge
88

9+
from xorq.backends import ExecutionBackend
910
from xorq.expr.api import read_csv, read_parquet
1011
from xorq.vendor.ibis import Schema, util
1112
from xorq.vendor.ibis.backends.sqlite import Backend as IbisSQLiteBackend
@@ -18,7 +19,7 @@
1819
import pyarrow as pa
1920

2021

21-
class Backend(IbisSQLiteBackend):
22+
class Backend(ExecutionBackend, IbisSQLiteBackend):
2223
def read_record_batches(
2324
self,
2425
record_batches: pa.RecordBatchReader,

0 commit comments

Comments
 (0)