Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d619f96
Added new Ordeq-Iceberg package
Josersanvil Nov 23, 2025
9080f73
Added resources tests
Josersanvil Nov 24, 2025
27fb09a
Remove unused import from test_table.py
Josersanvil Nov 24, 2025
5e743fd
Update IcebergCatalog and IcebergTableCreate examples; fix return typ…
Josersanvil Nov 24, 2025
1e9acc6
Enhance Ordeq-Iceberg package: update dependencies, improve code form…
Josersanvil Nov 24, 2025
9477954
Refactor imports and update dependencies in Ordeq-Iceberg package
Josersanvil Nov 24, 2025
efabeeb
Refactor IcebergTable and IcebergTableCreate to use Input for catalog…
Josersanvil Nov 24, 2025
70945a8
Add ordeq-iceberg to workspace dependencies in pyproject.toml
Josersanvil Nov 24, 2025
8600081
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Nov 24, 2025
ff2d267
fix: standardize log message quotes in create_and_load snapshot
Josersanvil Nov 24, 2025
9ad5c19
refactor: change IfTableExistsSaveOptions from StrEnum to Enum and up…
Josersanvil Nov 24, 2025
cd024f5
fix: add type ignore for return value in catalog load methods
Josersanvil Nov 24, 2025
23b4b0e
refactor: consolidate IcebergTable creation logic in a single IO and …
Josersanvil Nov 24, 2025
c239ef4
Don't persist data for IcebergTable
Josersanvil Nov 24, 2025
4acdbe8
fix: update create_save_table to use create_namespace_if_not_exists
Josersanvil Nov 26, 2025
50729c5
fix: update create_save_table to use create_namespace_if_not_exists
Josersanvil Nov 26, 2025
97308d1
refactor: remove unused table_resource variable from create_and_load.py
Josersanvil Nov 26, 2025
ff54057
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Nov 26, 2025
7e60e69
fix: enhance logging for IcebergCatalog and table operations
Josersanvil Nov 26, 2025
4fc6af2
refactor: rename private properties for clarity and add validation fo…
Josersanvil Dec 1, 2025
8786cbe
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 1, 2025
1314347
Update uv.lock after pulling from main
Josersanvil Dec 1, 2025
cd510f1
refactor: remove unused IcebergIOError import from table.py
Josersanvil Dec 1, 2025
048e8e6
Remove save method on IcebergTable class
Josersanvil Dec 1, 2025
586e2f2
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 1, 2025
04808b7
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 2, 2025
e2a53c4
refactor: add checks to create_table node and enhance load_table func…
Josersanvil Dec 2, 2025
d8681d4
Merge branch 'main' into feat/package/ordeq-iceberg
Josersanvil Dec 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions packages/ordeq-iceberg/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[build-system]
requires = ["setuptools>=75.0", "setuptools_scm>=8.1.0"]
build-backend = "setuptools.build_meta"

[project]
name = "ordeq_iceberg"
dynamic = ["version"]
readme = "README.md"
license = "MIT"
license-files = ["LICENSE"]
authors = [{ name = "Jose R. Sanchez Viloria <@josersanvil>" }]
maintainers = [{ name = "Jose R. Sanchez Viloria <@josersanvil>" }]
requires-python = ">=3.10"
dependencies = ["ordeq>=1.3.3", "pyiceberg[sql-sqlite]>=0.10.0"]

[dependency-groups]
dev = ["mypy-boto3-glue>=1.40.39"]
test = [
"ordeq-common",
"ordeq-test-utils",
"ordeq-test-examples",
"ordeq-dev-tools",
"pytest",
]

[tool.setuptools_scm]
root = "../../"
version_file = "src/ordeq_iceberg/_version.py"
tag_regex = "^ordeq-iceberg/(?:[\\w-]+-)?(?P<version>[vV]?\\d+(?:\\.\\d+){0,2}[^\\+]*)(?:\\+.*)?$"
git_describe_command = "git describe --dirty --tags --long --match ordeq-iceberg/*"

[tool.setuptools.packages.find]
namespaces = true
where = ["src"]

[tool.pytest.ini_options]
pythonpath = ["src", "tests"]
addopts = "--doctest-modules"

[tool.coverage.run]
omit = ["src/ordeq_iceberg/_version.py"]

[tool.ruff]
line-length = 88
13 changes: 13 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from ordeq_iceberg.catalog import IcebergCatalog
from ordeq_iceberg.errors import IcebergIOError, IcebergTableAlreadyExistsError
from ordeq_iceberg.table import IcebergTable
from ordeq_iceberg.table_create import IcebergTableCreate, IfTableExistsSaveOptions

__all__ = (
"IcebergCatalog",
"IcebergTable",
"IcebergTableCreate",
"IfTableExistsSaveOptions",
"IcebergIOError",
"IcebergTableAlreadyExistsError",
)
31 changes: 31 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from dataclasses import dataclass

from ordeq import Input
from pyiceberg.catalog import Catalog, CatalogType, load_catalog


@dataclass(frozen=True, kw_only=True)
class IcebergCatalog(Input[Catalog]):
"""IO for loading an Iceberg Catalog.

Example:

```pycon
>>> from pyiceberg.catalog import CatalogType
>>> iceberg_catalog = IcebergCatalog(
... name="my_catalog", catalog_type=CatalogType.IN_MEMORY
... )
>>> catalog = iceberg_catalog.load()

```

"""

name: str
catalog_type: CatalogType | str

def load(self, **load_options) -> Catalog:
catalog_type_value = self.catalog_type
if isinstance(self.catalog_type, CatalogType):
catalog_type_value = self.catalog_type.value
return load_catalog(self.name, type=catalog_type_value, **load_options)
11 changes: 11 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Module defining custom exceptions for Iceberg-related operations."""

from ordeq import IOException


class IcebergIOError(IOException):
"""Base class for Iceberg-related errors."""


class IcebergTableAlreadyExistsError(IcebergIOError):
"""Error raised when trying to create a table that already exists."""
1 change: 1 addition & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/py.typed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

50 changes: 50 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from dataclasses import dataclass

from ordeq import Input
from pyiceberg.catalog import Catalog
from pyiceberg.table import Table

from ordeq_iceberg.catalog import IcebergCatalog


@dataclass(frozen=True, kw_only=True)
class IcebergTable(Input[Table]):
"""IO for loading an Iceberg table.

Example:

```pycon
>>> from ordeq_iceberg import IcebergTable, IcebergCatalog
>>> catalog = IcebergCatalog(
... name="my_catalog",
... catalog_type="IN_MEMORY"
... )
>>> table = IcebergTable(
... catalog=catalog,
... table_name="my_table",
... namespace="my_namespace"
... )

```

"""

catalog: IcebergCatalog | Catalog
table_name: str
namespace: str

@property
def table_identifier(self) -> str:
return f"{self.namespace}.{self.table_name}"

def load(self, **load_options) -> Table:
"""Load the table instance from the catalog

Returns:
The loaded Iceberg table instance
"""
if isinstance(self.catalog, IcebergCatalog):
catalog = self.catalog.load()
else:
catalog = self.catalog
return catalog.load_table(self.table_identifier, **load_options)
135 changes: 135 additions & 0 deletions packages/ordeq-iceberg/src/ordeq_iceberg/table_create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from dataclasses import dataclass
from enum import StrEnum

from ordeq import Output
from pyiceberg.catalog import Catalog
from pyiceberg.schema import Schema
from pyiceberg.types import StructType

from ordeq_iceberg.catalog import IcebergCatalog
from ordeq_iceberg.errors import IcebergIOError, IcebergTableAlreadyExistsError


class IfTableExistsSaveOptions(StrEnum):
"""Options for handling existing tables when saving."""

DROP = "drop"
"""Drop the existing table before creating a new one."""
IGNORE = "ignore"
"""Do nothing if the table already exists."""
RAISE = "raise"
"""Raise an error if the table already exists."""


@dataclass(frozen=True, kw_only=True)
class IcebergTableCreate(Output[None]):
"""Ordeq output for creating an Iceberg table.

Example:

```pycon
>>> from ordeq_iceberg import IcebergTableCreate, IcebergCatalog
>>> from pyiceberg.schema import Schema, NestedField
>>> from pyiceberg.catalog import CatalogType
>>> from pyiceberg.types import StringType, IntegerType
>>> catalog = IcebergCatalog(
... name="my_catalog", catalog_type=CatalogType.IN_MEMORY
... )
>>> table_create = IcebergTableCreate(
... catalog=catalog,
... table_name="my_table",
... namespace="my_namespace",
... schema=Schema(
... NestedField(field_id=1, required=True, name="id", field_type=IntegerType()),
... NestedField(field_id=2, required=True, name="name", field_type=StringType()),
... ),
... if_exists=IfTableExistsSaveOptions.DROP,
... )

```

If using for an Ordeq pipeline where you need to control both
the creation and loading of the table, use this in
combination with `IcebergTable` with a shared resource name.

eg:

```pycon
>>> from ordeq_iceberg import IcebergTable, IcebergTableCreate, IcebergCatalog
>>> from pyiceberg.schema import Schema, NestedField
>>> from pyiceberg.catalog import CatalogType
>>> from pyiceberg.types import StringType, IntegerType
>>> catalog = IcebergCatalog(
... name="my_catalog", catalog_type=CatalogType.IN_MEMORY
... )
>>> my_table_resource_name = "my_table.my_namespace"
>>> table_create = IcebergTableCreate(
... catalog=catalog,
... table_name="my_table",
... namespace="my_namespace",
... schema=Schema(
... NestedField(field_id=1, required=True, name="id", field_type=IntegerType()),
... NestedField(field_id=2, required=True, name="name", field_type=StringType()),
... ),
... if_exists=IfTableExistsSaveOptions.DROP,
... ) @ my_table_resource_name
>>> table = IcebergTable(
... catalog=catalog,
... table_name="my_table",
... namespace="my_namespace",
... ) @ my_table_resource_name
"""

catalog: IcebergCatalog | Catalog
table_name: str
namespace: str
schema: Schema | StructType
"""Schema to use when creating the table"""
if_exists: IfTableExistsSaveOptions | None = None
"""What to do if the table already exists.
None (default) lets the underlying catalog handle
the situation (usually raises an error).
"""

@property
def table_identifier(self) -> str:
return f"{self.namespace}.{self.table_name}"

@property
def _catalog_value(self) -> Catalog:
if isinstance(self.catalog, IcebergCatalog):
return self.catalog.load()
return self.catalog

def table_exists(self) -> bool:
catalog = self._catalog_value
return catalog.table_exists(self.table_identifier)

def save(self, _: None = None, **save_options) -> None:
"""Create the table in the catalog with the provided schema.

Raises:
IcebergIOError: If the schema is not provided when creating a new table
IcebergTableAlreadyExistsError: If the table already exists and
`if_exists` is set to RAISE
"""
catalog = self._catalog_value
schema = self.schema or save_options.pop("schema", None)
if schema is None:
raise IcebergIOError("Schema must be provided to create a new table.")
if isinstance(schema, StructType):
schema = Schema(*schema.fields)
table_exists = self.table_exists()
if table_exists:
match self.if_exists:
case IfTableExistsSaveOptions.IGNORE:
return
case IfTableExistsSaveOptions.DROP:
catalog.drop_table(self.table_identifier)
case IfTableExistsSaveOptions.RAISE:
raise IcebergTableAlreadyExistsError(
f"Table '{self.table_identifier}' already exists."
)
catalog.create_table(
identifier=self.table_identifier, schema=schema, **save_options
)
68 changes: 68 additions & 0 deletions packages/ordeq-iceberg/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from collections.abc import Generator
from pathlib import Path
from tempfile import gettempdir
from typing import cast
from shutil import rmtree

import pytest
from pyiceberg.catalog import CatalogType
from pyiceberg.catalog.sql import SqlCatalog

from ordeq_iceberg import IcebergCatalog


@pytest.fixture
def resources_dir(request: pytest.FixtureRequest) -> Path:
"""Loads a `Path` referring to the resources directory specific to the
requesting test module.

For example, in tests/test_sth.py, this test should pass:

>>> def test_method(resources_dir):
... assert resources_dir == 'tests/test_sth'

Can be used to load and save data during tests in its own dedicated
folder, for instance:

>>> def test_method(resources_dir):
... with open(resources_dir / "test_method.csv", 'r') as file:
... assert len(file.readlines()) == 1

Args:
request: the `FixtureRequest` of the requesting test module

Returns:
the path to the resources directory
"""

(test_file_dot_py, _, _) = request.node.location
test_file = Path(*Path(test_file_dot_py).with_suffix("").parts[3:])
return Path(__file__).parent / "tests-resources" / test_file


@pytest.fixture(scope="module")
def sql_catalog_io() -> Generator[IcebergCatalog]:
temp_path = Path(gettempdir())
catalog_path = temp_path / "test_sql_catalog"
catalog_path.mkdir(parents=True, exist_ok=True)
catalog_io = IcebergCatalog(
name="test_sql_catalog",
catalog_type=CatalogType.SQL,
).with_load_options(
uri=f"sqlite:///{catalog_path / 'iceberg.db'}",
warehouse=str(catalog_path / "warehouse"),
)
catalog = cast(SqlCatalog, catalog_io.load())
catalog.create_namespace("test_namespace")
yield catalog_io
catalog.close()
(catalog_path / "iceberg.db").unlink(missing_ok=True)
rmtree(catalog_path / "warehouse", ignore_errors=True)
catalog_path.rmdir()


@pytest.fixture(scope="module")
def sql_catalog(sql_catalog_io: IcebergCatalog) -> Generator[SqlCatalog]:
catalog = cast(SqlCatalog, sql_catalog_io.load())
yield catalog
catalog.close()
Empty file.
Loading
Loading