Skip to content

Commit 9456033

Browse files
authored
Implement a custom SELECT with SAMPLE BY support (#27)
* Implement a custom SELECT with SAMPLE BY support TODO: - tests, including both sqlalchemy 1.4 and 2.0 and in combination with other clauses (GROUP BY etc) - documentation Example usage: ```python from sqlalchemy import create_engine, MetaData, Table, Column from questdb_connect import ( Timestamp, Double, Symbol, ) from sqlalchemy import func from questdb_connect import select engine = create_engine('questdb://admin:quest@localhost:8812/main') metadata = MetaData() # Define a table for sensor readings sensors = Table( 'sensors', metadata, Column('ts', Timestamp), Column('temperature', Double), Column('humidity', Double), Column('location', Symbol), ) def main(): metadata.create_all(engine) location_samples = select( sensors.c.ts, func.avg(sensors.c.temperature).label('avg_temp'), func.min(sensors.c.temperature).label('min_temp'), func.max(sensors.c.temperature).label('max_temp') ).where( sensors.c.location == 'warehouse' ).sample_by(1, 'd'); with engine.connect() as conn: for row in conn.execute(location_samples).fetchall(): print(f"Time: {row.ts}, Average Temp: {row.avg_temp}, Minimal Temp: {row.min_temp}, Maximal Temp: {row.max_temp}") if __name__ == '__main__': main() ``` * better SAMPLE BY injection this impl no longer depends on SQL Text post-processing and uses clause rendering instead. this makes it more robust. * style fixes * happy path tests * test with GROUP BY * test SAMPLE BY with subqueries * SAMPLE BY options * implement FROM-TO * forgotten doc * styles * fix core api with plain old select * cannot combine GROUP BY and SAMPLE BY
1 parent 781b7a5 commit 9456033

5 files changed

Lines changed: 617 additions & 0 deletions

File tree

src/questdb_connect/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
create_engine,
1212
create_superset_engine,
1313
)
14+
from questdb_connect.dml import QDBSelect, select
1415
from questdb_connect.identifier_preparer import QDBIdentifierPreparer
1516
from questdb_connect.inspector import QDBInspector
1617
from questdb_connect.keywords_functions import get_functions_list, get_keywords_list
@@ -51,6 +52,11 @@
5152
threadsafety = 2
5253
paramstyle = "pyformat"
5354

55+
__all__ = (
56+
"select",
57+
"QDBSelect",
58+
)
59+
5460

5561
class Error(Exception):
5662
pass

src/questdb_connect/compilers.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22

33
import sqlalchemy
4+
from sqlalchemy.sql.base import elements
45

56
from .common import quote_identifier, remove_public_schema
67
from .types import QDBTypeMixin
@@ -33,6 +34,72 @@ class QDBSQLCompiler(sqlalchemy.sql.compiler.SQLCompiler, abc.ABC):
3334
# Maximum value for 64-bit signed integer (2^63 - 1)
3435
BIGINT_MAX = 9223372036854775807
3536

37+
def visit_sample_by(self, sample_by, **kw):
38+
"""Compile a SAMPLE BY clause."""
39+
text = ""
40+
41+
# Basic SAMPLE BY
42+
if sample_by.unit:
43+
text = f"SAMPLE BY {sample_by.value}{sample_by.unit}"
44+
else:
45+
text = f"SAMPLE BY {sample_by.value}"
46+
47+
if sample_by.from_timestamp:
48+
# Format datetime to ISO format that QuestDB expects
49+
text += f" FROM '{sample_by.from_timestamp.isoformat()}'"
50+
if sample_by.to_timestamp:
51+
text += f" TO '{sample_by.to_timestamp.isoformat()}'"
52+
53+
# Add FILL if specified
54+
if sample_by.fill is not None:
55+
if isinstance(sample_by.fill, str):
56+
text += f" FILL({sample_by.fill})"
57+
else:
58+
text += f" FILL({sample_by.fill:g})"
59+
60+
# Add ALIGN TO clause
61+
text += f" ALIGN TO {sample_by.align_to}"
62+
63+
# Add TIME ZONE if specified
64+
if sample_by.timezone:
65+
text += f" TIME ZONE '{sample_by.timezone}'"
66+
67+
# Add WITH OFFSET if specified
68+
if sample_by.offset:
69+
text += f" WITH OFFSET '{sample_by.offset}'"
70+
71+
return text
72+
73+
def group_by_clause(self, select, **kw):
74+
"""Customize GROUP BY to also render SAMPLE BY."""
75+
text = ""
76+
77+
# Add SAMPLE BY first if present
78+
if _has_sample_by(select):
79+
text += " " + self.process(select._sample_by_clause, **kw)
80+
81+
# Use parent's GROUP BY implementation
82+
group_by_text = super().group_by_clause(select, **kw)
83+
if group_by_text:
84+
text += group_by_text
85+
86+
return text
87+
88+
def visit_select(self, select, **kw):
89+
"""Add SAMPLE BY support to the standard SELECT compilation."""
90+
91+
# If we have SAMPLE BY but no GROUP BY,
92+
# add a dummy GROUP BY clause to trigger the rendering
93+
if (
94+
_has_sample_by(select)
95+
and not select._group_by_clauses
96+
):
97+
select = select._clone()
98+
select._group_by_clauses = [elements.TextClause("")]
99+
100+
text = super().visit_select(select, **kw)
101+
return text
102+
36103
def _is_safe_for_fast_insert_values_helper(self):
37104
return True
38105

@@ -69,3 +136,6 @@ def limit_clause(self, select, **kw):
69136
text += f"{self.process(offset, **kw)},{self.BIGINT_MAX}"
70137

71138
return text
139+
140+
def _has_sample_by(select):
141+
return hasattr(select, '_sample_by_clause') and select._sample_by_clause is not None

src/questdb_connect/dml.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, Any, Optional, Sequence, Union
4+
5+
from sqlalchemy import select as sa_select
6+
from sqlalchemy.sql import ClauseElement
7+
from sqlalchemy.sql import Select as StandardSelect
8+
9+
if TYPE_CHECKING:
10+
from datetime import date, datetime
11+
12+
from sqlalchemy.sql.visitors import Visitable
13+
14+
15+
class SampleByClause(ClauseElement):
16+
"""Represents the QuestDB SAMPLE BY clause."""
17+
18+
__visit_name__ = "sample_by"
19+
stringify_dialect = "questdb"
20+
21+
def __init__(
22+
self,
23+
value: Union[int, float],
24+
unit: Optional[str] = None,
25+
fill: Optional[Union[str, float]] = None,
26+
align_to: str = "CALENDAR", # default per docs
27+
timezone: Optional[str] = None,
28+
offset: Optional[str] = None,
29+
from_timestamp: Optional[Union[datetime, date]] = None,
30+
to_timestamp: Optional[Union[datetime, date]] = None
31+
):
32+
self.value = value
33+
self.unit = unit.lower() if unit else None
34+
self.fill = fill
35+
self.align_to = align_to.upper()
36+
self.timezone = timezone
37+
self.offset = offset
38+
self.from_timestamp = from_timestamp
39+
self.to_timestamp = to_timestamp
40+
41+
def __str__(self) -> str:
42+
if self.unit:
43+
return f"SAMPLE BY {self.value}{self.unit}"
44+
return f"SAMPLE BY {self.value}"
45+
46+
def get_children(self, **kwargs: Any) -> Sequence[Visitable]:
47+
return []
48+
49+
50+
class QDBSelect(StandardSelect):
51+
"""QuestDB-specific implementation of SELECT.
52+
53+
Adds methods for QuestDB-specific syntaxes such as SAMPLE BY.
54+
55+
The :class:`_questdb.QDBSelect` object is created using the
56+
:func:`sqlalchemy.dialects.questdb.select` function.
57+
"""
58+
59+
stringify_dialect = "questdb"
60+
_sample_by_clause: Optional[SampleByClause] = None
61+
62+
def get_children(self, **kwargs: Any) -> Sequence[Visitable]:
63+
children = super().get_children(**kwargs)
64+
if self._sample_by_clause is not None:
65+
children = [*children, self._sample_by_clause]
66+
return children
67+
68+
def sample_by(
69+
self,
70+
value: Union[int, float],
71+
unit: Optional[str] = None,
72+
fill: Optional[Union[str, float]] = None,
73+
align_to: str = "CALENDAR",
74+
timezone: Optional[str] = None,
75+
offset: Optional[str] = None,
76+
from_timestamp: Optional[Union[datetime, date]] = None,
77+
to_timestamp: Optional[Union[datetime, date]] = None,
78+
) -> QDBSelect:
79+
"""Add a SAMPLE BY clause.
80+
81+
:param value: time interval value
82+
:param unit: 's' for seconds, 'm' for minutes, 'h' for hours, etc.
83+
:param fill: fill strategy - NONE, NULL, PREV, LINEAR, or constant value
84+
:param align_to: CALENDAR or FIRST OBSERVATION
85+
:param timezone: Optional timezone for calendar alignment
86+
:param offset: Optional offset in format '+/-HH:mm'
87+
:param from_timestamp: Optional start timestamp for the sample
88+
:param to_timestamp: Optional end timestamp for the sample
89+
"""
90+
91+
# Create a copy of our object with _generative
92+
s = self.__class__.__new__(self.__class__)
93+
s.__dict__ = self.__dict__.copy()
94+
95+
# Set the sample by clause
96+
s._sample_by_clause = SampleByClause(
97+
value, unit, fill, align_to, timezone, offset, from_timestamp, to_timestamp
98+
)
99+
return s
100+
101+
102+
def select(*entities: Any, **kwargs: Any) -> QDBSelect:
103+
"""Construct a QuestDB-specific variant :class:`_questdb.Select` construct.
104+
105+
.. container:: inherited_member
106+
107+
The :func:`sqlalchemy.dialects.questdb.select` function creates
108+
a :class:`sqlalchemy.dialects.questdb.Select`. This class is based
109+
on the dialect-agnostic :class:`_sql.Select` construct which may
110+
be constructed using the :func:`_sql.select` function in
111+
SQLAlchemy Core.
112+
113+
The :class:`_questdb.Select` construct includes additional method
114+
:meth:`_questdb.Select.sample_by` for QuestDB's SAMPLE BY clause.
115+
"""
116+
stmt = sa_select(*entities, **kwargs)
117+
# Convert the SQLAlchemy Select into our QDBSelect
118+
qdbs = QDBSelect.__new__(QDBSelect)
119+
qdbs.__dict__ = stmt.__dict__.copy()
120+
return qdbs

tests/conftest.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,32 @@ def collect_select_all(session, expected_rows) -> str:
126126
if rs.rowcount == expected_rows:
127127
return '\n'.join(str(row) for row in rs)
128128

129+
def wait_until_table_is_ready(test_engine, table_name, expected_rows, timeout=10):
130+
"""
131+
Wait until a table has the expected number of rows, with timeout.
132+
Args:
133+
test_engine: SQLAlchemy engine
134+
table_name: Name of the table to check
135+
expected_rows: Expected number of rows
136+
timeout: Maximum time to wait in seconds (default: 10 seconds)
137+
Returns:
138+
bool: True if table is ready, False if timeout occurred
139+
Raises:
140+
sqlalchemy.exc.SQLAlchemyError: If there's a database error
141+
"""
142+
start_time = time.time()
143+
144+
while time.time() - start_time < timeout:
145+
with test_engine.connect() as conn:
146+
result = conn.execute(text(f'SELECT count(*) FROM {table_name}'))
147+
row = result.fetchone()
148+
if row and row[0] == expected_rows:
149+
return True
150+
151+
print(f'Waiting for table {table_name} to have {expected_rows} rows, current: {row[0] if row else 0}')
152+
time.sleep(0.01) # Wait 10ms between checks
153+
return False
154+
129155

130156
def wait_until_table_is_ready(test_engine, table_name, expected_rows, timeout=10):
131157
"""

0 commit comments

Comments
 (0)