Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
78ff32f
BREAKING CHANGE: buffer expose as bytes instead of str
kafka1991 Mar 25, 2025
250135d
comment optimize.
kafka1991 Mar 25, 2025
874b90b
rename peek() to __bytes__
kafka1991 Mar 25, 2025
9118a09
resolve conflict.
kafka1991 Apr 6, 2025
75f620d
python client begin.
kafka1991 May 8, 2025
6ecc6da
python f64 array interface.
kafka1991 May 9, 2025
a4c02f1
add python tests
kafka1991 May 9, 2025
48b7517
fix http port
kafka1991 May 9, 2025
cc806dd
fix some python tests.
kafka1991 May 9, 2025
b94ee75
add line protocol version tests, array tests, and dataframe support n…
kafka1991 May 12, 2025
53f929d
update dep.
kafka1991 May 12, 2025
0106e0f
dataframe support ndarray.
kafka1991 May 12, 2025
dc613d5
data frame support ndarray.
kafka1991 May 12, 2025
1f39b9b
Merge remote-tracking branch 'origin/main' into array
kafka1991 May 12, 2025
ccec47f
fix tests and typo.
kafka1991 May 13, 2025
6dd2922
optimize python client.
kafka1991 May 16, 2025
55e965a
enrich python tests
kafka1991 May 19, 2025
36f0fe5
enrich python tests
kafka1991 May 19, 2025
704f47d
remove large array tests
kafka1991 May 19, 2025
5e7231f
fix tests
kafka1991 May 19, 2025
0050c4d
fix fuzz tests
kafka1991 May 19, 2025
060419a
add sleep time in tests.
kafka1991 May 19, 2025
8d8f5fc
change host to 127.0.0.1 from localhost
kafka1991 May 19, 2025
593bda3
fix tests
kafka1991 May 19, 2025
c17b989
fix py client
kafka1991 May 21, 2025
44f793e
update tests
kafka1991 May 21, 2025
84edbdd
updated c-questdb-client submodule
amunra May 21, 2025
a3cc1d2
bump version and update pyproject.toml
kafka1991 May 21, 2025
435bd55
Merge remote-tracking branch 'origin/array' into array
kafka1991 May 21, 2025
ac661bd
simplifying protocol_version API
amunra May 21, 2025
e7e04e4
negative testing of protocol version
amunra May 22, 2025
976151f
fixed timeout test flakiness
amunra May 22, 2025
c453a86
add docs and fix Gil issue.
kafka1991 May 23, 2025
1e24547
avoiding incref/decref when accessing numpy arrays within a dataframe
amunra May 23, 2025
1456cc2
Merge branch 'array' of https://github.com/questdb/py-questdb-client …
amunra May 23, 2025
cf4e9c9
minor commenting and renaming
amunra May 23, 2025
3c833b1
update insert array api.
kafka1991 May 23, 2025
0afb16e
tweaks
amunra May 23, 2025
b384ae0
Merge branch 'array' of https://github.com/questdb/py-questdb-client …
amunra May 23, 2025
1725951
cleanup
amunra May 25, 2025
53bd5f1
Fixed up front page docs with install instructions
amunra May 25, 2025
fc60c24
Fixed API docs (mostly, not completely)
amunra May 25, 2025
7973fe0
more doc fixes
amunra May 25, 2025
0dc0578
test fix
amunra May 25, 2025
be86c62
updated c-questdb-client submodule
amunra May 25, 2025
0c02ee7
adapt `/settings` response format.
kafka1991 May 26, 2025
088048b
Updated changelog
amunra May 26, 2025
2218b8a
updated notes and scripts to remove py3.8
amunra May 26, 2025
3489be8
fixed cibuildwheel
amunra May 26, 2025
29cad6e
attempting to fix cibuildwheel.yaml
amunra May 26, 2025
9250d66
first update pandas when install dependency.
kafka1991 May 26, 2025
2fe9081
force install pandas new version.
kafka1991 May 26, 2025
7dcde43
print port
kafka1991 May 26, 2025
f06cfab
add server timeout.
kafka1991 May 26, 2025
b84a6e4
removed linux 32-bit builds from CI
amunra May 26, 2025
f09d37a
simplify handle of doGet.
kafka1991 May 27, 2025
0e166d4
use more concise timer
kafka1991 May 27, 2025
8bc17a7
force keep connection.
kafka1991 May 27, 2025
bbb795c
update c-submodule.
kafka1991 May 27, 2025
3f181cd
try to increase test auto flush intervel.
kafka1991 May 27, 2025
9646726
try to use windows latest in CI
kafka1991 May 27, 2025
2d828c0
use msvc for windows platform in CI.
kafka1991 May 28, 2025
62d15bf
revert remove of PublishBuildArtifacts@1
kafka1991 May 28, 2025
be69866
fix flake tests.
kafka1991 May 28, 2025
8664617
fix flake tests.
kafka1991 May 28, 2025
26422b4
adjust `test_http_request_min_throughput_timeout` tests.
kafka1991 May 28, 2025
6d73b05
Reverting back to naming ILP as InfluxDB line protocol, but clarifyin…
amunra Jun 2, 2025
4e9a424
updated submodule
amunra Jun 2, 2025
16312ae
updated changelog date
amunra Jun 2, 2025
89ee45e
Fixed versioning in missing files
amunra Jun 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ QuestDB Client Library for Python
This is the official Python client library for `QuestDB <https://questdb.io>`_.

This client library implements QuestDB's variant of the
`InfluxDB Line Protocol <https://questdb.io/docs/reference/api/ilp/overview/>`_
`Ingestion Line Protocol <https://questdb.io/docs/reference/api/ilp/overview/>`_
(ILP) over HTTP and TCP.

ILP provides the fastest way to insert data into QuestDB.
Expand Down
2 changes: 1 addition & 1 deletion c-questdb-client
Submodule c-questdb-client updated 108 files
10 changes: 8 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import shutil
import platform
import numpy as np

from setuptools import setup, find_packages
from setuptools.extension import Extension
Expand Down Expand Up @@ -83,12 +84,17 @@ def ingress_extension():
["src/questdb/ingress.pyx"],
include_dirs=[
"c-questdb-client/include",
"pystr-to-utf8/include"],
"pystr-to-utf8/include",
np.get_include()],
library_dirs=lib_paths,
libraries=libraries,
extra_compile_args=extra_compile_args,
extra_link_args=extra_link_args,
extra_objects=extra_objects)
extra_objects=extra_objects,
define_macros = [
('NPY_NO_DEPRECATED_API', 'NPY_1_7_API_VERSION')
]
)


def cargo_build():
Expand Down
39 changes: 36 additions & 3 deletions src/questdb/dataframe.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ cdef auto_flush_t auto_flush_blank() noexcept nogil:
af.last_flush_ms = NULL
return af


cdef bint should_auto_flush(
const auto_flush_mode_t* af_mode,
line_sender_buffer* ls_buf,
Expand Down Expand Up @@ -73,7 +72,8 @@ cdef enum col_target_t:
col_target_column_f64 = 5
col_target_column_str = 6
col_target_column_ts = 7
col_target_at = 8
col_target_column_array = 8
col_target_at = 9


cdef dict _TARGET_NAMES = {
Expand All @@ -85,6 +85,7 @@ cdef dict _TARGET_NAMES = {
col_target_t.col_target_column_f64: "float",
col_target_t.col_target_column_str: "string",
col_target_t.col_target_column_ts: "timestamp",
col_target_t.col_target_column_array: "array",
col_target_t.col_target_at: "designated timestamp",
}

Expand Down Expand Up @@ -125,6 +126,7 @@ cdef enum col_source_t:
col_source_str_lrg_utf8_arrow = 406000
col_source_dt64ns_numpy = 501000
col_source_dt64ns_tz_arrow = 502000
col_source_array_numpy = 503000


cdef bint col_source_needs_gil(col_source_t source) noexcept nogil:
Expand Down Expand Up @@ -213,6 +215,9 @@ cdef dict _TARGET_TO_SOURCES = {
col_source_t.col_source_dt64ns_numpy,
col_source_t.col_source_dt64ns_tz_arrow,
},
col_target_t.col_target_column_array: {
col_source_t.col_source_array_numpy,
},
col_target_t.col_target_at: {
col_source_t.col_source_dt64ns_numpy,
col_source_t.col_source_dt64ns_tz_arrow,
Expand All @@ -227,7 +232,8 @@ cdef tuple _FIELD_TARGETS = (
col_target_t.col_target_column_i64,
col_target_t.col_target_column_f64,
col_target_t.col_target_column_str,
col_target_t.col_target_column_ts)
col_target_t.col_target_column_ts,
col_target_t.col_target_column_array)


# Targets that map directly from a meta target.
Expand Down Expand Up @@ -349,6 +355,9 @@ cdef enum col_dispatch_code_t:
col_dispatch_code_at__dt64ns_tz_arrow = \
col_target_t.col_target_at + col_source_t.col_source_dt64ns_tz_arrow

col_dispatch_code_column_array__array_numpy = \
col_target_t.col_target_column_array + col_source_t.col_source_array_numpy


# Int values in order for sorting (as needed for API's sequential coupling).
cdef enum meta_target_t:
Expand Down Expand Up @@ -932,6 +941,8 @@ cdef void_int _dataframe_series_sniff_pyobj(
col.setup.source = col_source_t.col_source_float_pyobj
elif PyUnicode_CheckExact(obj):
col.setup.source = col_source_t.col_source_str_pyobj
elif PyArray_CheckExact(obj):
col.setup.source = col_source_t.col_source_array_numpy
elif PyBytes_CheckExact(obj):
raise IngressError(
IngressErrorCode.BadDataFrame,
Expand Down Expand Up @@ -2016,6 +2027,26 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_numpy(
_ensure_has_gil(gs)
raise c_err_to_py(err)

cdef void_int _dataframe_serialize_cell_column_array__array_numpy(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
col_t* col,
PyThreadState** gs) except -1:
cdef PyObject** access = <PyObject**>col.cursor.chunk.buffers[1]
cdef PyObject* cell = access[col.cursor.offset]
cdef cnp.ndarray arr = <cnp.ndarray> cell
if cnp.PyArray_TYPE(arr) != cnp.NPY_FLOAT64:
raise IngressError(IngressErrorCode.ArrayWriteToBufferError,
'Only support float64 array, got: %s' % str(arr.dtype))
cdef:
size_t rank = cnp.PyArray_NDIM(arr)
const uint8_t* data_ptr = <const uint8_t *> cnp.PyArray_DATA(arr)
line_sender_error * err = NULL
if not line_sender_buffer_column_f64_arr(
ls_buf, col.name, rank, <const size_t*> cnp.PyArray_DIMS(arr),
<const ssize_t*> cnp.PyArray_STRIDES(arr), data_ptr, cnp.PyArray_NBYTES(arr), &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)

cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(
line_sender_buffer* ls_buf,
Expand Down Expand Up @@ -2173,6 +2204,8 @@ cdef void_int _dataframe_serialize_cell(
_dataframe_serialize_cell_column_str__str_i32_cat(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_numpy:
_dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_array__array_numpy:
_dataframe_serialize_cell_column_array__array_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow:
_dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy:
Expand Down
2 changes: 2 additions & 0 deletions src/questdb/extra_cpython.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ cdef extern from "Python.h":

bint PyLong_CheckExact(PyObject* o)

bint PyArray_CheckExact(PyObject * o)

bint PyFloat_CheckExact(PyObject* o)

double PyFloat_AS_DOUBLE(PyObject* o)
Expand Down
55 changes: 38 additions & 17 deletions src/questdb/ingress.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd

class IngressErrorCode(Enum):
Expand All @@ -54,8 +55,17 @@ class IngressErrorCode(Enum):
HttpNotSupported = ...
ServerFlushError = ...
ConfigError = ...
ArrayLargeDimError = ...
ArrayInternalError = ...
ArrayWriteToBufferError = ...
ProtocolVersionError = ...
BadDataFrame = ...

class ProtocolVersion(Enum):
"""Write protocol version."""
ProtocolVersionV1 = ...
ProtocolVersionV2 = ...

class IngressError(Exception):
"""An error whilst using the ``Sender`` or constructing its ``Buffer``."""

Expand Down Expand Up @@ -194,7 +204,7 @@ class SenderTransaction:
*,
symbols: Optional[Dict[str, Optional[str]]] = None,
columns: Optional[
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime]]
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]]
] = None,
at: Union[ServerTimestamp, TimestampNanos, datetime],
) -> SenderTransaction:
Expand Down Expand Up @@ -237,7 +247,7 @@ class SenderTransaction:

class Buffer:
"""
Construct QuestDB-flavored InfluxDB Line Protocol (ILP) messages.
Construct QuestDB-flavored Ingestion Line Protocol (ILP) messages.

The :func:`Buffer.row` method is used to add a row to the buffer.

Expand Down Expand Up @@ -300,7 +310,7 @@ class Buffer:

"""

def __init__(self, init_buf_size: int = 65536, max_name_len: int = 127):
def __init__(self, protocol_version: ProtocolVersion, init_buf_size: int = 65536, max_name_len: int = 127):
"""
Create a new buffer with the an initial capacity and max name length.
:param int init_buf_size: Initial capacity of the buffer in bytes.
Expand Down Expand Up @@ -345,19 +355,19 @@ class Buffer:
"""
The current number of bytes currently in the buffer.

Equivalent (but cheaper) to ``len(str(sender))``.
Equivalent (but cheaper) to ``len(bytes(buffer))``.
"""

def __str__(self) -> str:
"""Return the constructed buffer as a string. Use for debugging."""
def __bytes__(self) -> bytes:
"""Return the constructed buffer as bytes. Use for debugging."""

def row(
self,
table_name: str,
*,
symbols: Optional[Dict[str, Optional[str]]] = None,
columns: Optional[
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime]]
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]]
] = None,
at: Union[ServerTimestamp, TimestampNanos, datetime],
) -> Buffer:
Expand All @@ -377,7 +387,8 @@ class Buffer:
'col4': 'xyz',
'col5': TimestampMicros(123456789),
'col6': datetime(2019, 1, 1, 12, 0, 0),
'col7': None},
'col7': np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]),
'col8': None},
at=TimestampNanos(123456789))

# Only symbols specified. Designated timestamp assigned by the db.
Expand Down Expand Up @@ -420,6 +431,8 @@ class Buffer:
- `FLOAT <https://questdb.io/docs/reference/api/ilp/columnset-types#float>`_
* - ``str``
- `STRING <https://questdb.io/docs/reference/api/ilp/columnset-types#string>`_
* - ``np.ndarray``
- `ARRAY <https://questdb.io/docs/reference/api/ilp/columnset-types#array>`_
* - ``datetime.datetime`` and ``TimestampMicros``
- `TIMESTAMP <https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp>`_
* - ``None``
Expand Down Expand Up @@ -806,6 +819,7 @@ class Sender:
auto_flush_rows: Optional[int] = None,
auto_flush_bytes: bool = False,
auto_flush_interval: int = 1000,
disable_line_protocol_validation: bool = False,
init_buf_size: int = 65536,
max_name_len: int = 127,
): ...
Expand All @@ -831,6 +845,7 @@ class Sender:
auto_flush_rows: Optional[int] = None,
auto_flush_bytes: bool = False,
auto_flush_interval: int = 1000,
disable_line_protocol_validation: bool = False,
init_buf_size: int = 65536,
max_name_len: int = 127,
) -> Sender:
Expand Down Expand Up @@ -866,6 +881,7 @@ class Sender:
auto_flush_rows: Optional[int] = None,
auto_flush_bytes: bool = False,
auto_flush_interval: int = 1000,
disable_line_protocol_validation: bool = False,
init_buf_size: int = 65536,
max_name_len: int = 127,
) -> Sender:
Expand Down Expand Up @@ -925,6 +941,11 @@ class Sender:
Time interval threshold for the auto-flush logic, or None if disabled.
"""

def protocol_version(self) -> ProtocolVersion:
"""
Returns the QuestDB server's recommended default line protocol version.
"""

def establish(self):
"""
Prepare the sender for use.
Expand All @@ -941,20 +962,20 @@ class Sender:
def __enter__(self) -> Sender:
"""Call :func:`Sender.establish` at the start of a ``with`` block."""

def __str__(self) -> str:
def __len__(self) -> int:
"""
Inspect the contents of the internal buffer.

The ``str`` value returned represents the unsent data.
Number of bytes of unsent data in the internal buffer.

Also see :func:`Sender.__len__`.
Equivalent (but cheaper) to ``len(bytes(sender))``.
"""

def __len__(self) -> int:
def __bytes__(self) -> bytes:
"""
Number of bytes of unsent data in the internal buffer.
Inspect the contents of the internal buffer.

The ``bytes`` value returned represents the unsent data.

Equivalent (but cheaper) to ``len(str(sender))``.
Also see :func:`Sender.__len__`.
"""

def transaction(self, table_name: str) -> SenderTransaction:
Expand All @@ -968,7 +989,7 @@ class Sender:
*,
symbols: Optional[Dict[str, str]] = None,
columns: Optional[
Dict[str, Union[bool, int, float, str, TimestampMicros, datetime]]
Dict[str, Union[bool, int, float, str, TimestampMicros, datetime, np.ndarray]]
] = None,
at: Union[TimestampNanos, datetime, ServerTimestamp],
) -> Sender:
Expand Down
Loading
Loading