Skip to content

Commit de0e12f

Browse files
committed
wip -- fix things
1 parent eea2567 commit de0e12f

File tree

4 files changed

+33
-16
lines changed

4 files changed

+33
-16
lines changed

python/cudf_polars/cudf_polars/dsl/ir.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ def __init__(
460460
schema,
461461
typ,
462462
reader_options,
463+
cloud_options,
463464
paths,
464465
with_columns,
465466
skip_rows,
@@ -928,7 +929,14 @@ def __init__(
928929
self.options = options
929930
self.cloud_options = cloud_options
930931
self.children = (df,)
931-
self._non_child_args = (schema, kind, path, parquet_options, options)
932+
self._non_child_args = (
933+
schema,
934+
kind,
935+
path,
936+
parquet_options,
937+
options,
938+
cloud_options,
939+
)
932940
if self.cloud_options is not None and any(
933941
self.cloud_options.get(k) is not None
934942
for k in ("config", "credential_provider")
@@ -1209,7 +1217,7 @@ def __init__(self, schema: Schema, key: int, refcount: int | None, value: IR):
12091217
self.key = key
12101218
self.refcount = refcount
12111219
self.children = (value,)
1212-
self._non_child_args = (key, refcount)
1220+
self._non_child_args = (schema, key, refcount)
12131221

12141222
def get_hashable(self) -> Hashable: # noqa: D102
12151223
# Polars arranges that the keys are unique across all cache
@@ -1355,7 +1363,7 @@ def __init__(
13551363
self.exprs = tuple(exprs)
13561364
self.should_broadcast = should_broadcast
13571365
self.children = (df,)
1358-
self._non_child_args = (self.exprs, should_broadcast)
1366+
self._non_child_args = (schema, self.exprs, should_broadcast)
13591367
if (
13601368
Select._is_len_expr(self.exprs)
13611369
and isinstance(df, Scan)
@@ -1465,7 +1473,7 @@ def __init__(
14651473
self.schema = schema
14661474
self.exprs = tuple(exprs)
14671475
self.children = (df,)
1468-
self._non_child_args = (self.exprs,)
1476+
self._non_child_args = (schema, self.exprs)
14691477

14701478
@classmethod
14711479
@log_do_evaluate
@@ -2009,7 +2017,7 @@ def __init__(
20092017
assert not nulls_equal
20102018
assert not coalesce
20112019
assert maintain_order == "none"
2012-
self._non_child_args = (predicate_wrapper, options)
2020+
self._non_child_args = (schema, predicate_wrapper, options)
20132021

20142022
@classmethod
20152023
@log_do_evaluate
@@ -2129,7 +2137,7 @@ def __init__(
21292137
self.right_on = tuple(right_on)
21302138
self.options = options
21312139
self.children = (left, right)
2132-
self._non_child_args = (self.left_on, self.right_on, self.options)
2140+
self._non_child_args = (schema, self.left_on, self.right_on, self.options)
21332141

21342142
@staticmethod
21352143
@cache
@@ -2500,7 +2508,7 @@ def __init__(
25002508
self.schema = schema
25012509
self.columns = tuple(columns)
25022510
self.should_broadcast = should_broadcast
2503-
self._non_child_args = (self.columns, self.should_broadcast)
2511+
self._non_child_args = (schema, self.columns, self.should_broadcast)
25042512
self.children = (df,)
25052513

25062514
@classmethod
@@ -2564,7 +2572,7 @@ def __init__(
25642572
self.subset = subset
25652573
self.zlice = zlice
25662574
self.stable = stable
2567-
self._non_child_args = (keep, subset, zlice, stable)
2575+
self._non_child_args = (schema, keep, subset, zlice, stable)
25682576
self.children = (df,)
25692577

25702578
_KEEP_MAP: ClassVar[dict[str, plc.stream_compaction.DuplicateKeepOption]] = {
@@ -2663,6 +2671,7 @@ def __init__(
26632671
self.stable = stable
26642672
self.zlice = zlice
26652673
self._non_child_args = (
2674+
schema,
26662675
self.by,
26672676
self.order,
26682677
self.null_order,
@@ -2727,7 +2736,7 @@ def __init__(self, schema: Schema, offset: int, length: int | None, df: IR):
27272736
self.schema = schema
27282737
self.offset = offset
27292738
self.length = length
2730-
self._non_child_args = (offset, length)
2739+
self._non_child_args = (schema, offset, length)
27312740
self.children = (df,)
27322741

27332742
@classmethod
@@ -2757,7 +2766,7 @@ class Filter(IR):
27572766
def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR):
27582767
self.schema = schema
27592768
self.mask = mask
2760-
self._non_child_args = (mask,)
2769+
self._non_child_args = (schema, mask)
27612770
self.children = (df,)
27622771

27632772
@classmethod
@@ -2825,7 +2834,7 @@ def __init__(self, schema: Schema, key: str, left: IR, right: IR):
28252834
self.schema = schema
28262835
self.key = key
28272836
self.children = (left, right)
2828-
self._non_child_args = (key,)
2837+
self._non_child_args = (schema, key)
28292838

28302839
@classmethod
28312840
@log_do_evaluate
@@ -3075,7 +3084,7 @@ class Union(IR):
30753084
def __init__(self, schema: Schema, zlice: Zlice | None, *children: IR):
30763085
self.schema = schema
30773086
self.zlice = zlice
3078-
self._non_child_args = (zlice,)
3087+
self._non_child_args = (schema, zlice)
30793088
self.children = children
30803089
schema = self.children[0].schema
30813090

@@ -3126,7 +3135,7 @@ def __init__(
31263135
):
31273136
self.schema = schema
31283137
self.should_broadcast = should_broadcast
3129-
self._non_child_args = (should_broadcast,)
3138+
self._non_child_args = (schema, should_broadcast)
31303139
self.children = children
31313140

31323141
@staticmethod

python/cudf_polars/cudf_polars/experimental/io.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,13 +417,14 @@ def _sink_to_directory(
417417
path: str,
418418
parquet_options: ParquetOptions,
419419
options: dict[str, Any],
420+
cloud_options: dict[str, Any] | None,
420421
df: DataFrame,
421422
ready: None,
422423
context: IRExecutionContext,
423424
) -> DataFrame:
424425
"""Sink a partition to a new file."""
425426
return Sink.do_evaluate(
426-
schema, kind, path, parquet_options, options, df, context=context
427+
schema, kind, path, parquet_options, options, cloud_options, df, context=context
427428
)
428429

429430

@@ -566,6 +567,7 @@ def _directory_sink_graph(
566567
f"{sink.path}/part.{str(i).zfill(width)}.{suffix}",
567568
sink.parquet_options,
568569
sink.options,
570+
sink.cloud_options,
569571
(child_name, i),
570572
setup_name,
571573
context,

python/cudf_polars/cudf_polars/experimental/join.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ def _make_bcast_join(
144144
left: IR,
145145
right: IR,
146146
shuffle_method: ShuffleMethod,
147+
*,
147148
streaming_runtime: str,
148149
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
149150
if ir.options[0] != "Inner":
@@ -290,7 +291,7 @@ def _(
290291
left,
291292
right,
292293
config_options.executor.shuffle_method,
293-
config_options.executor.runtime,
294+
streaming_runtime=config_options.executor.runtime,
294295
)
295296
else:
296297
# Create a hash join
@@ -391,6 +392,7 @@ def _(
391392
inter_key = (inter_name, part_out, j)
392393
graph[(inter_name, part_out, j)] = (
393394
partial(ir.do_evaluate, context=context),
395+
ir.schema,
394396
ir.left_on,
395397
ir.right_on,
396398
ir.options,

python/cudf_polars/cudf_polars/experimental/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@
2828

2929
def _concat(*dfs: DataFrame, context: IRExecutionContext) -> DataFrame:
3030
# Concatenate a sequence of DataFrames vertically
31-
return dfs[0] if len(dfs) == 1 else Union.do_evaluate(None, *dfs, context=context)
31+
return (
32+
dfs[0]
33+
if len(dfs) == 1
34+
else Union.do_evaluate(None, None, *dfs, context=context)
35+
)
3236

3337

3438
def _fallback_inform(msg: str, config_options: ConfigOptions) -> None:

0 commit comments

Comments
 (0)