Skip to content

Commit 09cf0b6

Browse files
authored
Merge pull request #2964 from mabel-dev/#2963
Iceberg Fixes #2963, #2962, #2961
2 parents b8c903d + f1f131c commit 09cf0b6

File tree

8 files changed

+658
-15
lines changed

8 files changed

+658
-15
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1965
4+
__build__ = 1966
55
__author__ = "@joocer"
6-
__version__ = "0.26.2-beta.1965"
6+
__version__ = "0.26.2-beta.1966"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/joins/inner_join.pyx

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ from libc.stddef cimport size_t
1515
from libcpp.vector cimport vector
1616

1717
from time import perf_counter_ns
18+
cimport cython
1819

1920
from opteryx.third_party.abseil.containers cimport (
2021
FlatHashMap,
@@ -75,15 +76,16 @@ cpdef tuple inner_join(object right_relation, list join_columns, FlatHashMap lef
7576
last_hash_time_ns = t_after_hash - t_start
7677

7778
with nogil:
78-
inner_join_probe(
79-
&left_hash_table._map,
80-
&non_null_indices[0],
81-
<size_t>candidate_count,
82-
&row_hashes[0],
83-
<size_t>num_rows,
84-
left_indexes.c_buffer,
85-
right_indexes.c_buffer,
86-
)
79+
with cython.boundscheck(False):
80+
inner_join_probe(
81+
&left_hash_table._map,
82+
&non_null_indices[0],
83+
<size_t>candidate_count,
84+
&row_hashes[0],
85+
<size_t>num_rows,
86+
left_indexes.c_buffer,
87+
right_indexes.c_buffer,
88+
)
8789
cdef long long t_after_probe = perf_counter_ns()
8890
last_probe_time_ns = t_after_probe - t_after_hash
8991
last_rows_hashed = num_rows

opteryx/connectors/capabilities/statistics.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ def prune_blobs(self, blob_names: list[str], query_statistics, selection) -> lis
6868
and cond.right.schema_column.type
6969
not in (OrsoTypes.DATE, OrsoTypes.TIME, OrsoTypes.TIMESTAMP)
7070
]
71+
# Handle AnyOp* comparisons where left is a literal and right is an identifier
72+
valid_conditions_any = [
73+
cond
74+
for cond in selection
75+
if cond.value == "AnyOpEq"
76+
and cond.left.node_type == NodeType.LITERAL
77+
and cond.right.node_type == NodeType.IDENTIFIER
78+
and cond.right.schema_column.type
79+
not in (OrsoTypes.DATE, OrsoTypes.TIME, OrsoTypes.TIMESTAMP)
80+
and cond.left.schema_column.type
81+
not in (OrsoTypes.DATE, OrsoTypes.TIME, OrsoTypes.TIMESTAMP)
82+
]
7183

7284
for condition in valid_conditions:
7385
column_name = condition.left.source_column.encode()
@@ -87,6 +99,35 @@ def prune_blobs(self, blob_names: list[str], query_statistics, selection) -> lis
8799
skip_blob = True
88100
break
89101

102+
# Evaluate AnyOp* conditions (literal = ANY(column)) safely using element min/max
103+
for condition in valid_conditions_any:
104+
column_name = condition.right.source_column.encode()
105+
literal_value = condition.left.value
106+
# Skip NULL literals — unsafe to prune
107+
if literal_value is None:
108+
continue
109+
if type(literal_value) is numpy.datetime64:
110+
# convert to python datetime for consistent to_int conversion
111+
literal_value = literal_value.astype("M8[ms]").astype("O")
112+
if hasattr(literal_value, "item"):
113+
literal_value = literal_value.item()
114+
literal_value = to_int(literal_value)
115+
# Skip NULL_FLAG values (NaN/unconvertible) that appear as the NULL sentinel
116+
NULL_FLAG = -(1 << 63)
117+
if literal_value == NULL_FLAG:
118+
continue
119+
max_value = cached_stats.upper_bounds.get(column_name)
120+
min_value = cached_stats.lower_bounds.get(column_name)
121+
122+
if max_value is not None and min_value is not None:
123+
# convert AnyOpEq -> Eq, AnyOpGt -> Gt, etc.
124+
op_name = condition.value[5:]
125+
prune = handlers.get(op_name)
126+
if prune and prune(literal_value, min_value, max_value):
127+
query_statistics.blobs_pruned += 1
128+
skip_blob = True
129+
break
130+
90131
if not skip_blob:
91132
new_blob_names.append(blob_name)
92133

opteryx/connectors/iceberg_connector.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ def read_dataset(
276276
)
277277
)
278278

279+
# Short-cut COUNT(*) handling
280+
if selected_columns == []:
281+
table = pyarrow.Table.from_arrays(
282+
[[self.relation_statistics.record_count]], names=["$COUNT(*)"]
283+
)
284+
yield table
285+
return
286+
279287
reader = self.table.scan(
280288
row_filter=pushed_filters,
281289
selected_fields=selected_columns,
@@ -356,7 +364,7 @@ def decode_iceberg_value(
356364
elif data_type_class == pyiceberg.types.DoubleType:
357365
# IEEE 754 encoded floats are typically decoded directly
358366
return struct.unpack("<d", value)[0] # 8-byte IEEE 754 double
359-
elif data_type_class == pyiceberg.types.TimestampType:
367+
elif data_type_class in (pyiceberg.types.TimestampType, pyiceberg.types.TimestamptzType):
360368
# Iceberg stores timestamps as microseconds since epoch
361369
interval = int.from_bytes(value, "little", signed=True)
362370
if interval < 0:
@@ -378,5 +386,5 @@ def decode_iceberg_value(
378386
return Decimal(int_value) / (10**data_type.scale)
379387
elif data_type_class == pyiceberg.types.BooleanType:
380388
return bool(value)
381-
else:
382-
raise ValueError(f"Unsupported data type: {data_type}, {str(data_type)}")
389+
390+
ValueError(f"Unsupported data type: {data_type}, {str(data_type)}")

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.2-beta.1965"
3+
version = "0.26.2-beta.1966"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

tests/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,15 @@ def _ensure_two_snapshots(identifier):
794794
with contextlib.suppress(NamespaceAlreadyExistsError):
795795
catalog.create_namespace("opteryx")
796796

797+
_epoch_schema = pyarrow.schema([
798+
pyarrow.field("epoch", pyarrow.timestamp("ms", tz="UTC"))
799+
])
800+
_epoch_table = pyarrow.Table.from_arrays(
801+
[pyarrow.array([datetime.datetime.now(datetime.timezone.utc)], type=_epoch_schema.field("epoch").type)],
802+
schema=_epoch_schema
803+
)
804+
catalog.create_table("opteryx.epoch", schema=_epoch_schema).append(_epoch_table)
805+
797806
data = opteryx.query_to_arrow("SELECT tweet_id, text, timestamp, user_id, user_verified, user_name, hash_tags, followers, following, tweets_by_user, is_quoting, is_reply_to, is_retweeting FROM testdata.flat.formats.parquet")
798807
table = catalog.create_table("opteryx.tweets", schema=data.schema)
799808
table.append(data.slice(0, 50000))
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import numpy
2+
import pyarrow as pa
3+
4+
from opteryx.compiled.joins.join_definitions import (
5+
build_side_hash_map,
6+
)
7+
from opteryx.compiled.table_ops.hash_ops import compute_hashes
8+
from opteryx.compiled.table_ops.null_avoidant_ops import non_null_indices
9+
10+
11+
def test_build_side_hash_map_basic_values():
12+
"""
13+
Verify that build_side_hash_map produces a FlatHashMap containing a mapping for
14+
every non-null row hash computed from a small VALUES table.
15+
"""
16+
table = pa.table({"x": [1, 2, 3]})
17+
18+
# Build the Cython-side hash map
19+
ht = build_side_hash_map(table, ["x"])
20+
21+
# Compute the row hashes and non-null indices via the compiled helper
22+
num_rows = table.num_rows
23+
row_hashes = compute_hashes(table, ["x"]) # returns array.array('Q')
24+
non_nulls = non_null_indices(table, ["x"]) # typed memoryview
25+
26+
# For every non-null row, ensure the hash map returns a non-empty list
27+
for i in range(non_nulls.shape[0]):
28+
row_idx = int(non_nulls[i])
29+
key = int(row_hashes[row_idx])
30+
found = ht.get(key)
31+
assert found is not None and len(found) >= 1, f"Missing mapping for hash {key} (row {row_idx})"
32+
assert row_idx in found, f"Row idx {row_idx} not present in mapping for hash {key}: {found}"
33+

0 commit comments

Comments
 (0)