Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d619f96
Added new Ordeq-Iceberg package
Josersanvil Nov 23, 2025
9080f73
Added resources tests
Josersanvil Nov 24, 2025
27fb09a
Remove unused import from test_table.py
Josersanvil Nov 24, 2025
5e743fd
Update IcebergCatalog and IcebergTableCreate examples; fix return typ…
Josersanvil Nov 24, 2025
1e9acc6
Enhance Ordeq-Iceberg package: update dependencies, improve code form…
Josersanvil Nov 24, 2025
9477954
Refactor imports and update dependencies in Ordeq-Iceberg package
Josersanvil Nov 24, 2025
efabeeb
Refactor IcebergTable and IcebergTableCreate to use Input for catalog…
Josersanvil Nov 24, 2025
70945a8
Add ordeq-iceberg to workspace dependencies in pyproject.toml
Josersanvil Nov 24, 2025
8600081
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Nov 24, 2025
ff2d267
fix: standardize log message quotes in create_and_load snapshot
Josersanvil Nov 24, 2025
9ad5c19
refactor: change IfTableExistsSaveOptions from StrEnum to Enum and up…
Josersanvil Nov 24, 2025
cd024f5
fix: add type ignore for return value in catalog load methods
Josersanvil Nov 24, 2025
23b4b0e
refactor: consolidate IcebergTable creation logic in a single IO and …
Josersanvil Nov 24, 2025
c239ef4
Don't persist data for IcebergTable
Josersanvil Nov 24, 2025
4acdbe8
fix: update create_save_table to use create_namespace_if_not_exists
Josersanvil Nov 26, 2025
50729c5
fix: update create_save_table to use create_namespace_if_not_exists
Josersanvil Nov 26, 2025
97308d1
refactor: remove unused table_resource variable from create_and_load.py
Josersanvil Nov 26, 2025
ff54057
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Nov 26, 2025
7e60e69
fix: enhance logging for IcebergCatalog and table operations
Josersanvil Nov 26, 2025
4fc6af2
refactor: rename private properties for clarity and add validation fo…
Josersanvil Dec 1, 2025
8786cbe
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 1, 2025
1314347
Update uv.lock after pulling from main
Josersanvil Dec 1, 2025
cd510f1
refactor: remove unused IcebergIOError import from table.py
Josersanvil Dec 1, 2025
048e8e6
Remove save method on IcebergTable class
Josersanvil Dec 1, 2025
586e2f2
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 1, 2025
04808b7
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 2, 2025
e2a53c4
refactor: add checks to create_table node and enhance load_table func…
Josersanvil Dec 2, 2025
d8681d4
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 3, 2025
2209152
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions packages/ordeq-iceberg/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[build-system]
requires = ["setuptools>=75.0", "setuptools_scm>=8.1.0"]
build-backend = "setuptools.build_meta"

[project]
name = "ordeq_iceberg"
description = "Iceberg integration for Ordeq"
dynamic = ["version"]
readme = "README.md"
license = "MIT"
license-files = ["LICENSE"]
authors = [{ name = "Jose R. Sanchez Viloria <@josersanvil>" }]
maintainers = [{ name = "Jose R. Sanchez Viloria <@josersanvil>" }]
requires-python = ">=3.10"
dependencies = [
"ordeq>=1.3.3",
"pyarrow>=21.0.0",
"pyiceberg[sql-sqlite]>=0.10.0",
]

[tool.ordeq-dev]
group = "ios"
logo_url = "https://iceberg.apache.org/assets/images/Iceberg-logo.svg"

[dependency-groups]
test = [
"ordeq-common",
"ordeq-test-utils",
"ordeq-dev-tools",
"pytest",
]

[tool.setuptools_scm]
root = "../../"
version_file = "src/ordeq_iceberg/_version.py"
tag_regex = "^ordeq-iceberg/(?:[\\w-]+-)?(?P<version>[vV]?\\d+(?:\\.\\d+){0,2}[^\\+]*)(?:\\+.*)?$"
git_describe_command = "git describe --dirty --tags --long --match ordeq-iceberg/*"

[tool.setuptools.packages.find]
namespaces = true
where = ["src"]

[tool.coverage.run]
omit = ["src/ordeq_iceberg/_version.py"]
4 changes: 4 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from ordeq_iceberg.catalog import IcebergCatalog
from ordeq_iceberg.table import IcebergTable

__all__ = ("IcebergCatalog", "IcebergTable")
32 changes: 32 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dataclasses import dataclass

from ordeq import Input
from pyiceberg.catalog import Catalog, CatalogType, load_catalog


@dataclass(frozen=True, kw_only=True)
class IcebergCatalog(Input[Catalog]):
"""IO for loading an Iceberg Catalog.

Example:

```pycon
>>> from pyiceberg.catalog import CatalogType
>>> iceberg_catalog = IcebergCatalog(
... name="my_catalog", catalog_type=CatalogType.IN_MEMORY
... )
>>> catalog = iceberg_catalog.load()

```

"""

name: str
catalog_type: CatalogType | str

def load(self, **load_options) -> Catalog:
if isinstance(self.catalog_type, CatalogType):
catalog_type_value = self.catalog_type.value
else:
catalog_type_value = str(self.catalog_type)
return load_catalog(self.name, type=catalog_type_value, **load_options)
1 change: 1 addition & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/py.typed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

54 changes: 54 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from dataclasses import dataclass

from ordeq import Input
from pyiceberg.catalog import Catalog
from pyiceberg.table import Table
from pyiceberg.typedef import Identifier


@dataclass(frozen=True, kw_only=True)
class IcebergTable(Input[Table]):
"""IO for loading an Iceberg table.

Example:

```pycon
>>> import pyiceberg.types as T
>>> from ordeq_iceberg import (
... IcebergTable, IcebergCatalog
... )
>>> catalog = IcebergCatalog(
... name="my_catalog",
... catalog_type="IN_MEMORY"
... )
>>> table = IcebergTable(
... catalog=catalog,
... table_name="my_table",
... namespace="my_namespace",
... )

```

"""

catalog: Input[Catalog] | Catalog
table_name: str
namespace: str

@property
def table_identifier(self) -> Identifier:
return (self.namespace, self.table_name)

@property
def _catalog(self) -> Catalog:
if isinstance(self.catalog, Input):
return self.catalog.load() # type: ignore[return-value]
return self.catalog

def load(self, **load_options) -> Table:
"""Load the table instance from the catalog

Returns:
The loaded Iceberg table instance
"""
return self._catalog.load_table(self.table_identifier, **load_options)
37 changes: 37 additions & 0 deletions packages/ordeq-iceberg/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from collections.abc import Generator
from pathlib import Path
from shutil import rmtree
from tempfile import gettempdir
from typing import cast

import pytest
from ordeq_iceberg import IcebergCatalog
from pyiceberg.catalog import CatalogType
from pyiceberg.catalog.sql import SqlCatalog


@pytest.fixture(scope="module")
def sql_catalog_io() -> Generator[IcebergCatalog]:
temp_path = Path(gettempdir())
catalog_path = temp_path / "test_sql_catalog"
catalog_path.mkdir(parents=True, exist_ok=True)
catalog_io = IcebergCatalog(
name="test_sql_catalog", catalog_type=CatalogType.SQL
).with_load_options(
uri=f"sqlite:///{catalog_path / 'iceberg.db'}",
warehouse=str(catalog_path / "warehouse"),
)
catalog = cast("SqlCatalog", catalog_io.load())
catalog.create_namespace("test_namespace")
yield catalog_io
catalog.close()
(catalog_path / "iceberg.db").unlink(missing_ok=True)
rmtree(catalog_path / "warehouse", ignore_errors=True)
catalog_path.rmdir()


@pytest.fixture(scope="module")
def sql_catalog(sql_catalog_io: IcebergCatalog) -> Generator[SqlCatalog]:
catalog = cast("SqlCatalog", sql_catalog_io.load())
yield catalog
catalog.close()
48 changes: 48 additions & 0 deletions packages/ordeq-iceberg/tests/resources/iceberg_test/load_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pyiceberg.types as T
from ordeq import node, run
from ordeq_common import Literal
from ordeq_iceberg import IcebergCatalog, IcebergTable
from ordeq_viz import viz
from pyiceberg.catalog import Catalog, CatalogType
from pyiceberg.schema import Schema
from pyiceberg.table import Table

# Catalog

my_catalog = IcebergCatalog(
name="test_catalog", catalog_type=CatalogType.IN_MEMORY
)

test_namespace = Literal[str]("test_namespace")
test_table_name = Literal[str]("test_table")

my_table = IcebergTable(
catalog=my_catalog,
table_name=test_table_name.value,
namespace=test_namespace.value,
)

# Nodes


@node(inputs=[my_catalog, test_namespace, test_table_name], checks=[my_table])
def create_table(catalog: Catalog, namespace: str, table_name: str) -> None:
catalog.create_namespace_if_not_exists(namespace)
catalog.create_table_if_not_exists(
(namespace, table_name),
schema=Schema(
*T.StructType(
T.NestedField(1, "id", T.IntegerType(), required=True),
T.NestedField(2, "data", T.StringType(), required=False),
).fields
),
)


@node(inputs=[my_table])
def load_table(created_table: Table) -> None:
print(f"Table loaded from Input object: '{created_table}'")


print(f"Viz Diagram:\n```\n{viz(__name__, fmt='mermaid-md')}\n```")
run(load_table, create_table)
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
## Resource

```python
import pyiceberg.types as T
from ordeq import node, run
from ordeq_common import Literal
from ordeq_iceberg import IcebergCatalog, IcebergTable
from ordeq_viz import viz
from pyiceberg.catalog import Catalog, CatalogType
from pyiceberg.schema import Schema
from pyiceberg.table import Table

# Catalog

my_catalog = IcebergCatalog(
name="test_catalog", catalog_type=CatalogType.IN_MEMORY
)

test_namespace = Literal[str]("test_namespace")
test_table_name = Literal[str]("test_table")

my_table = IcebergTable(
catalog=my_catalog,
table_name=test_table_name.value,
namespace=test_namespace.value,
)

# Nodes


@node(inputs=[my_catalog, test_namespace, test_table_name], checks=[my_table])
def create_table(catalog: Catalog, namespace: str, table_name: str) -> None:
catalog.create_namespace_if_not_exists(namespace)
catalog.create_table_if_not_exists(
(namespace, table_name),
schema=Schema(
*T.StructType(
T.NestedField(1, "id", T.IntegerType(), required=True),
T.NestedField(2, "data", T.StringType(), required=False),
).fields
),
)


@node(inputs=[my_table])
def load_table(created_table: Table) -> None:
print(f"Table loaded from Input object: '{created_table}'")


print(f"Viz Diagram:/n```/n{viz(__name__, fmt='mermaid-md')}/n```")
run(load_table, create_table)

```

## Output

```text
Viz Diagram:
```
```mermaid
graph TB
subgraph legend["Legend"]
direction TB
view_type@{shape: subroutine, label: "View"}
io_type_0@{shape: rect, label: "IcebergCatalog"}
io_type_1@{shape: rect, label: "IcebergTable"}
io_type_2@{shape: rect, label: "Literal"}
end

__main__:my_catalog --> __main__:create_table
__main__:test_namespace --> __main__:create_table
__main__:test_table_name --> __main__:create_table
__main__:my_table --> __main__:load_table

__main__:create_table@{shape: subroutine, label: "create_table"}
__main__:load_table@{shape: subroutine, label: "load_table"}
__main__:my_catalog@{shape: rect, label: "my_catalog"}
__main__:my_table@{shape: rect, label: "my_table"}
__main__:test_namespace@{shape: rect, label: "test_namespace"}
__main__:test_table_name@{shape: rect, label: "test_table_name"}

class view_type,__main__:create_table,__main__:load_table view
class io_type_0,__main__:my_catalog io0
class io_type_1,__main__:my_table io1
class io_type_2,__main__:test_namespace,__main__:test_table_name io2
classDef io fill:#FFD43B
classDef view fill:#00C853,color:#FFF
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb

```

```
Table loaded from Input object: 'test_table(
1: id: required int,
2: data: optional string
),
partition by: [],
sort order: [],
snapshot: null'

```

## Logging

```text
WARNING ordeq.preview Checks are in preview mode and may change without notice in future releases.
INFO ordeq.io Loading IcebergCatalog 'create_table:catalog' in module '__main__'
DEBUG ordeq.io Persisting data for IcebergCatalog 'create_table:catalog' in module '__main__'
INFO ordeq.io Loading Literal 'create_table:namespace' in module '__main__'
DEBUG ordeq.io Persisting data for Literal 'create_table:namespace' in module '__main__'
INFO ordeq.io Loading Literal 'create_table:table_name' in module '__main__'
DEBUG ordeq.io Persisting data for Literal 'create_table:table_name' in module '__main__'
INFO ordeq.runner Running view 'create_table' in module '__main__'
DEBUG ordeq.io Persisting data for IO(id=ID1)
INFO ordeq.io Loading IcebergTable 'load_table:created_table' in module '__main__'
DEBUG ordeq.io Loading cached data for IcebergCatalog 'create_table:catalog' in module '__main__'
DEBUG ordeq.io Persisting data for IcebergTable 'load_table:created_table' in module '__main__'
INFO ordeq.runner Running view 'load_table' in module '__main__'
DEBUG ordeq.io Persisting data for IO(id=ID2)
DEBUG ordeq.io Unpersisting data for IcebergCatalog 'create_table:catalog' in module '__main__'
DEBUG ordeq.io Unpersisting data for Literal 'create_table:namespace' in module '__main__'
DEBUG ordeq.io Unpersisting data for Literal 'create_table:table_name' in module '__main__'
DEBUG ordeq.io Unpersisting data for IO(id=ID1)
DEBUG ordeq.io Unpersisting data for IcebergTable 'load_table:created_table' in module '__main__'
DEBUG ordeq.io Unpersisting data for IO(id=ID2)

```
31 changes: 31 additions & 0 deletions packages/ordeq-iceberg/tests/test_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from unittest.mock import patch

from ordeq_iceberg import IcebergCatalog
from pyiceberg.catalog import CatalogType


def test_load_catalog_with_literal_catalog_type():
catalog_input = IcebergCatalog(
name="test_catalog", catalog_type="in-memory"
)
catalog = catalog_input.load()
assert catalog.name == "test_catalog"


def test_load_catalog_with_enum_catalog_type():
catalog_input = IcebergCatalog(
name="test_catalog", catalog_type=CatalogType.IN_MEMORY
)
catalog = catalog_input.load()
assert catalog.name == "test_catalog"


@patch("ordeq_iceberg.catalog.load_catalog")
def test_load_catalog_is_called_with_correct_parameters(mock_load_catalog):
catalog_input = IcebergCatalog(
name="test_catalog", catalog_type="catalog_type_value"
).with_load_options(some_option="some_value")
catalog_input.load()
mock_load_catalog.assert_called_once_with(
"test_catalog", type="catalog_type_value", some_option="some_value"
)
Loading