diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index d84c82ec2a..448993b4f6 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1758,3 +1758,66 @@ shape: (11, 4) │ 21 ┆ 566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20 │ └───────────┴─────────────┴────────────────────────────┴─────────────────────┘ ``` + +### Apache DataFusion + +PyIceberg integrates with [Apache DataFusion](https://datafusion.apache.org/) through the Custom Table Provider interface ([FFI_TableProvider](https://datafusion.apache.org/python/user-guide/io/table_provider.html)) exposed through `iceberg-rust`. + + + +!!! note "Requirements" + This requires [`datafusion` to be installed](index.md). + + + + + +!!! warning "Experimental Feature" + The DataFusion integration is considered **experimental**. + + The integration has a few caveats: + + - Only works with `datafusion >= 45` + - Depends directly on `iceberg-rust` instead of PyIceberg's implementation + - Has limited features compared to the full PyIceberg API + + The integration will improve as both DataFusion and `iceberg-rust` matures. + + + +PyIceberg tables can be registered directly with DataFusion's SessionContext using the table provider interface. + +```python +from datafusion import SessionContext +from pyiceberg.catalog import load_catalog +import pyarrow as pa + +# Load catalog and create/load a table +catalog = load_catalog("catalog", type="in-memory") +catalog.create_namespace_if_not_exists("default") + +# Create some sample data +data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) +iceberg_table = catalog.create_table("default.test", schema=data.schema) +iceberg_table.append(data) + +# Register the table with DataFusion +ctx = SessionContext() +ctx.register_table_provider("test", iceberg_table) + +# Query the table using DataFusion SQL +ctx.table("test").show() +``` + +This will output: + +```python +DataFrame() ++---+---+ +| x | y | ++---+---+ +| 1 | 4 | +| 2 | 5 | +| 3 | 6 | ++---+---+ +``` diff --git a/poetry.lock b/poetry.lock index 889b3abe7a..6aae0c58b0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -59,7 +59,7 @@ description = "Happy Eyeballs for asyncio" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "aiohappyeyeballs-2.6.1-py3-none-any.whl", hash = "sha256:f349ba8f4b75cb25c99c5c2d84e997e485204d2902a9597802b0371f09331fb8"}, {file = "aiohappyeyeballs-2.6.1.tar.gz", hash = "sha256:c3f9d0113123803ccadfdf3f0faa505bc78e6a72d1cc4806cbd719826e943558"}, @@ -72,7 +72,7 @@ description = "Async http client/server framework (asyncio)" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "aiohttp-3.12.13-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:5421af8f22a98f640261ee48aae3a37f0c41371e99412d55eaf2f8a46d5dad29"}, {file = "aiohttp-3.12.13-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0fcda86f6cb318ba36ed8f1396a6a4a3fd8f856f84d426584392083d10da4de0"}, @@ -202,7 +202,7 @@ description = "aiosignal: a list of registered asynchronous callbacks" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" files = [ {file = "aiosignal-1.3.2-py2.py3-none-any.whl", hash = "sha256:45cde58e409a301715980c2b01d0c28bdde3770d8290b5eb2173759d9acb31a5"}, {file = "aiosignal-1.3.2.tar.gz", hash = "sha256:a8c255c66fafb1e499c9351d0bf32ff2d8a0321595ebac3b93713656d2436f54"}, @@ -268,7 +268,7 @@ description = "Timeout context manager for asyncio programs" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and python_version <= \"3.10\"" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and python_version <= \"3.10\"" files = [ {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, @@ -285,7 +285,7 @@ files = [ {file = "attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3"}, {file = "attrs-25.3.0.tar.gz", hash = "sha256:75d7cefc7fb576747b2c81b4442d4d4a1ce0900973527c011d1030fd3bf4af1b"}, ] -markers = {main = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")"} +markers = {main = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")"} [package.extras] benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] @@ -488,7 +488,7 @@ files = [ {file = "boto3-1.38.27-py3-none-any.whl", hash = "sha256:95f5fe688795303a8a15e8b7e7f255cadab35eae459d00cc281a4fd77252ea80"}, {file = "boto3-1.38.27.tar.gz", hash = "sha256:94bd7fdd92d5701b362d4df100d21e28f8307a67ff56b6a8b0398119cf22f859"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\""} [package.dependencies] botocore = ">=1.38.27,<1.39.0" @@ -509,7 +509,7 @@ files = [ {file = "botocore-1.38.27-py3-none-any.whl", hash = "sha256:a785d5e9a5eda88ad6ab9ed8b87d1f2ac409d0226bba6ff801c55359e94d91a8"}, {file = "botocore-1.38.27.tar.gz", hash = "sha256:9788f7efe974328a38cbade64cc0b1e67d27944b899f88cb786ae362973133b6"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} [package.dependencies] jmespath = ">=0.7.1,<2.0.0" @@ -1160,9 +1160,10 @@ files = [ name = "datafusion" version = "47.0.0" description = "Build and run queries against data" -optional = false +optional = true python-versions = ">=3.9" -groups = ["dev"] +groups = ["main"] +markers = "extra == \"datafusion\"" files = [ {file = "datafusion-47.0.0-cp39-abi3-macosx_10_12_x86_64.whl", hash = "sha256:ccd83a8e49fb39be06ddfa87023200a9ddc93d181247654ac951fa5720219d08"}, {file = "datafusion-47.0.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:43677e6284b165727031aec14d4beaa97296e991960293c61dcb66a3a9ce59b8"}, @@ -1473,7 +1474,7 @@ description = "A list-like structure which implements collections.abc.MutableSeq optional = true python-versions = ">=3.9" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" files = [ {file = "frozenlist-1.7.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:cc4df77d638aa2ed703b878dd093725b72a824c3c546c076e8fdf276f78ee84a"}, {file = "frozenlist-1.7.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:716a9973a2cc963160394f701964fe25012600f3d311f60c790400b00e568b61"}, @@ -2247,7 +2248,7 @@ files = [ {file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"}, {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} [[package]] name = "joserfc" @@ -3037,7 +3038,7 @@ description = "multidict implementation" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "multidict-6.6.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:cfd9c74d337e710d7ee26e72a7dbedbd60e0c58d3df7c5ccbb748857e977783c"}, {file = "multidict-6.6.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:9d2c5867a1bd182041a950e9ec3dd3622926260434655bd5d94a62d889100787"}, @@ -3769,7 +3770,7 @@ description = "Accelerated property cache" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "propcache-0.3.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:22d9962a358aedbb7a2e36187ff273adeaab9743373a272976d2e348d08c7770"}, {file = "propcache-0.3.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0d0fda578d1dc3f77b6b5a5dce3b9ad69a8250a891760a548df850a5e8da87f3"}, @@ -4036,9 +4037,10 @@ files = [ name = "pyarrow" version = "20.0.0" description = "Python library for Apache Arrow" -optional = false +optional = true python-versions = ">=3.9" -groups = ["main", "dev"] +groups = ["main"] +markers = "extra == \"pyarrow\" or extra == \"pandas\" or extra == \"duckdb\" or extra == \"ray\" or extra == \"daft\" or extra == \"datafusion\"" files = [ {file = "pyarrow-20.0.0-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:c7dd06fd7d7b410ca5dc839cc9d485d2bc4ae5240851bcd45d85105cc90a47d7"}, {file = "pyarrow-20.0.0-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:d5382de8dc34c943249b01c19110783d0d64b207167c728461add1ecc2db88e4"}, @@ -4096,7 +4098,6 @@ files = [ {file = "pyarrow-20.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:9965a050048ab02409fb7cbbefeedba04d3d67f2cc899eff505cc084345959ca"}, {file = "pyarrow-20.0.0.tar.gz", hash = "sha256:febc4a913592573c8d5805091a6c2b5064c8bd6e002131f01061797d91c783c1"}, ] -markers = {main = "extra == \"daft\" or extra == \"duckdb\" or extra == \"pandas\" or extra == \"pyarrow\" or extra == \"ray\""} [package.extras] test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] @@ -5225,7 +5226,7 @@ files = [ {file = "s3transfer-0.13.0-py3-none-any.whl", hash = "sha256:0148ef34d6dd964d0d8cf4311b2b21c474693e57c2e069ec708ce043d2b527be"}, {file = "s3transfer-0.13.0.tar.gz", hash = "sha256:f5e6db74eb7776a37208001113ea7aa97695368242b364d73e91c981ac522177"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\""} [package.dependencies] botocore = ">=1.37.4,<2.0a.0" @@ -5733,7 +5734,7 @@ description = "Fast, Extensible Progress Meter" optional = true python-versions = ">=3.7" groups = ["main"] -markers = "extra == \"daft\" or extra == \"hf\"" +markers = "extra == \"hf\" or extra == \"daft\"" files = [ {file = "tqdm-4.67.1-py3-none-any.whl", hash = "sha256:26445eca388f82e72884e0d580d5464cd801a3ea01e63e5601bdff9ba6a48de2"}, {file = "tqdm-4.67.1.tar.gz", hash = "sha256:f8aef9c52c08c13a65f30ea34f4e5aac3fd1a34959879d7e59e63027286627f2"}, @@ -6018,7 +6019,7 @@ description = "Yet another URL library" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "yarl-1.20.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:6032e6da6abd41e4acda34d75a816012717000fa6839f37124a47fcefc49bec4"}, {file = "yarl-1.20.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:2c7b34d804b8cf9b214f05015c4fee2ebe7ed05cf581e7192c06555c71f4446a"}, @@ -6269,6 +6270,7 @@ cffi = ["cffi (>=1.11)"] [extras] adlfs = ["adlfs"] daft = ["getdaft"] +datafusion = ["datafusion"] duckdb = ["duckdb", "pyarrow"] dynamodb = ["boto3"] gcsfs = ["gcsfs"] @@ -6291,4 +6293,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.1" python-versions = "^3.9.2, !=3.9.7" -content-hash = "c2f45d4d591caedd7d513922884de881cf4ef30a8b431a5ceb6bb9e56711a669" +content-hash = "c3676c4f64eeafe88af2acf9ec7428258a8ef1a92320091f2225865bffbecb6f" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 07602c9ee5..1dc7a29cc1 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -143,6 +143,7 @@ import pyarrow as pa import ray from duckdb import DuckDBPyConnection + from pyiceberg_core.datafusion import IcebergDataFusionTable from pyiceberg.catalog import Catalog @@ -1494,6 +1495,51 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def __datafusion_table_provider__(self) -> "IcebergDataFusionTable": + """Return the DataFusion table provider PyCapsule interface. + + To support DataFusion features such as push down filtering, this function will return a PyCapsule + interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective + you should not need to call this function directly. Instead you can use ``register_table_provider`` in + the DataFusion SessionContext. + + Returns: + A PyCapsule DataFusion TableProvider interface. + + Example: + ```python + from datafusion import SessionContext + from pyiceberg.catalog import load_catalog + import pyarrow as pa + catalog = load_catalog("catalog", type="in-memory") + catalog.create_namespace_if_not_exists("default") + data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + iceberg_table = catalog.create_table("default.test", schema=data.schema) + iceberg_table.append(data) + ctx = SessionContext() + ctx.register_table_provider("test", iceberg_table) + ctx.table("test").show() + ``` + Results in + ``` + DataFrame() + +---+---+ + | x | y | + +---+---+ + | 1 | 4 | + | 2 | 5 | + | 3 | 6 | + +---+---+ + ``` + """ + from pyiceberg_core.datafusion import IcebergDataFusionTable + + return IcebergDataFusionTable( + identifier=self.name(), + metadata_location=self.metadata_location, + file_io_properties=self.io.properties, + ).__datafusion_table_provider__() + class StaticTable(Table): """Load a table directly from a metadata file (i.e., without using a catalog).""" diff --git a/pyproject.toml b/pyproject.toml index 4e479e9d0e..a680df1d30 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,6 +84,7 @@ pyiceberg-core = { version = "^0.5.1", optional = true } polars = { version = "^1.21.0", optional = true } thrift-sasl = { version = ">=0.4.3", optional = true } kerberos = {version = "^1.3.1", optional = true} +datafusion = { version = ">=45", optional = true } [tool.poetry.group.dev.dependencies] pytest = "7.4.4" @@ -99,7 +100,6 @@ pytest-mock = "3.14.1" pyspark = "3.5.6" cython = "3.1.2" deptry = ">=0.14,<0.24" -datafusion = ">=44,<48" docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520 mypy-boto3-glue = ">=1.28.18" mypy-boto3-dynamodb = ">=1.28.18" @@ -314,6 +314,7 @@ gcsfs = ["gcsfs"] rest-sigv4 = ["boto3"] hf = ["huggingface-hub"] pyiceberg-core = ["pyiceberg-core"] +datafusion = ["datafusion"] [tool.pytest.ini_options] markers = [ diff --git a/tests/table/test_datafusion.py b/tests/table/test_datafusion.py new file mode 100644 index 0000000000..d9fa3e1e7b --- /dev/null +++ b/tests/table/test_datafusion.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from pathlib import Path + +import pyarrow as pa +import pytest +from datafusion import SessionContext + +from pyiceberg.catalog import Catalog, load_catalog + + +@pytest.fixture(scope="session") +def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path: + return tmp_path_factory.mktemp("warehouse") + + +@pytest.fixture(scope="session") +def catalog(warehouse: Path) -> Catalog: + catalog = load_catalog( + "default", + uri=f"sqlite:///{warehouse}/pyiceberg_catalog.db", + warehouse=f"file://{warehouse}", + ) + return catalog + + +def test_datafusion_register_pyiceberg_table(catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + catalog.create_namespace_if_not_exists("default") + iceberg_table = catalog.create_table_if_not_exists( + "default.dataset", + schema=arrow_table_with_null.schema, + ) + iceberg_table.append(arrow_table_with_null) + + ctx = SessionContext() + ctx.register_table_provider("test", iceberg_table) + + datafusion_table = ctx.table("test") + assert datafusion_table is not None + + assert datafusion_table.to_arrow_table().to_pylist() == iceberg_table.scan().to_arrow().to_pylist() + + from pandas.testing import assert_frame_equal + + assert_frame_equal( + datafusion_table.to_arrow_table().to_pandas(), + iceberg_table.scan().to_arrow().to_pandas(), + )