Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 269 additions & 0 deletions dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import yaml
from typing import Dict, Any, List, Optional
from pathlib import Path

from fsspec import AbstractFileSystem
from packaging.version import Version
Expand Down Expand Up @@ -142,6 +144,273 @@ def get_sql_catalog(
)


class CatalogNotFoundError(Exception):
"""Raised when a catalog cannot be found in the specified configuration method"""

pass


def load_catalog_from_yaml(
catalog_name: str,
config_path: Optional[str] = None,
credentials: Optional[FileSystemCredentials] = None,
) -> IcebergCatalog:
"""Load Iceberg catalog from .pyiceberg.yaml file

Args:
catalog_name: Name of the catalog to load from YAML
config_path: Optional path to .pyiceberg.yaml file. If None, searches in:
1. Current directory (./.pyiceberg.yaml)
2. DLT project directory (./.dlt/.pyiceberg.yaml)
3. Home directory (~/.pyiceberg.yaml)
credentials: Optional filesystem credentials to merge into catalog config

Returns:
IcebergCatalog instance loaded from YAML configuration

Raises:
CatalogNotFoundError: If no .pyiceberg.yaml file found or catalog not in file

Example .pyiceberg.yaml:
catalog:
my_catalog:
type: rest
uri: https://catalog.example.com
warehouse: my_warehouse
credential: token
"""
from pyiceberg.catalog import load_catalog

search_paths = []
if config_path:
search_paths.append(Path(config_path))
else:
# Search in standard locations
search_paths.extend(
[
Path.cwd() / ".pyiceberg.yaml",
Path.cwd() / ".dlt" / ".pyiceberg.yaml",
Path.home() / ".pyiceberg.yaml",
]
)

# Search for the first existing config file
config = None
for path in search_paths:
if path.exists():
logger.info(f"Loading Iceberg catalog configuration from: {path}")
with open(path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
break

# If no config file was found, raise error
if config is None:
raise CatalogNotFoundError(
f"No .pyiceberg.yaml file found. Searched in: {', '.join(str(p) for p in search_paths)}"
)

# Check if catalog exists in config
if "catalog" not in config or catalog_name not in config["catalog"]:
available = list(config.get("catalog", {}).keys())
raise CatalogNotFoundError(
f"Catalog '{catalog_name}' not found in .pyiceberg.yaml. "
f"Available catalogs: {available}"
)

logger.info(f"Found catalog '{catalog_name}' in .pyiceberg.yaml")

# Get catalog config and merge credentials if provided
catalog_config = config["catalog"][catalog_name].copy()
if credentials:
fileio_config = _get_fileio_config(credentials)
catalog_config.update(fileio_config)

return load_catalog(catalog_name, **catalog_config)


def load_catalog_from_config(
catalog_name: str,
config_dict: Dict[str, Any],
credentials: Optional[FileSystemCredentials] = None,
) -> IcebergCatalog:
"""Load Iceberg catalog from configuration dictionary

Args:
catalog_name: Name of the catalog
config_dict: Dictionary with catalog configuration (type, uri, warehouse, etc.)
credentials: Optional filesystem credentials to merge into config

Returns:
IcebergCatalog instance

Raises:
CatalogNotFoundError: If config_dict is None or empty

Example:
config = {
'type': 'rest',
'uri': 'https://catalog.example.com',
'warehouse': 'my_warehouse',
'credential': 'token'
}
catalog = load_catalog_from_config('my_catalog', config)
"""
from pyiceberg.catalog import load_catalog

if not config_dict:
raise CatalogNotFoundError("No configuration dictionary provided")

logger.info(f"Loading catalog '{catalog_name}' from provided configuration")

# Merge filesystem credentials if provided
config_dict = config_dict.copy()
if credentials:
fileio_config = _get_fileio_config(credentials)
config_dict.update(fileio_config)

return load_catalog(catalog_name, **config_dict)


def load_catalog_from_env(
catalog_name: Optional[str] = None,
credentials: Optional[FileSystemCredentials] = None,
) -> IcebergCatalog:
"""Load Iceberg catalog from environment variables

Args:
catalog_name: Optional catalog name (defaults to DLT_ICEBERG_CATALOG_NAME env var)
credentials: Optional filesystem credentials to merge into config

Returns:
IcebergCatalog instance

Raises:
CatalogNotFoundError: If required environment variables are not set

Environment variables:
DLT_ICEBERG_CATALOG_NAME: Name of the catalog (optional, defaults to 'default')
DLT_ICEBERG_CATALOG_TYPE: Type of catalog ('sql' or 'rest')
DLT_ICEBERG_CATALOG_URI: Catalog URI
DLT_ICEBERG_CATALOG_WAREHOUSE: Warehouse name (for REST catalogs)
DLT_ICEBERG_CATALOG_PROP_*: Additional properties (e.g., PROP_CREDENTIAL, PROP_SCOPE)
"""
from pyiceberg.catalog import load_catalog

catalog_name = catalog_name or os.getenv("DLT_ICEBERG_CATALOG_NAME", "default")
catalog_type = os.getenv("DLT_ICEBERG_CATALOG_TYPE")

if not catalog_type:
raise CatalogNotFoundError(
"DLT_ICEBERG_CATALOG_TYPE environment variable not set. Set to 'sql' or 'rest'."
)

logger.info(f"Loading catalog '{catalog_name}' from environment variables")

if catalog_type == "rest":
config = {
"type": "rest",
"uri": os.getenv("DLT_ICEBERG_CATALOG_URI"),
"warehouse": os.getenv("DLT_ICEBERG_CATALOG_WAREHOUSE"),
}

if not config["uri"] or not config["warehouse"]:
raise CatalogNotFoundError(
"For REST catalog, DLT_ICEBERG_CATALOG_URI and "
"DLT_ICEBERG_CATALOG_WAREHOUSE must be set"
)

# Add custom properties from DLT_ICEBERG_CATALOG_PROP_* variables
for key, value in os.environ.items():
if key.startswith("DLT_ICEBERG_CATALOG_PROP_"):
prop_name = key.replace("DLT_ICEBERG_CATALOG_PROP_", "").lower().replace("_", "-")
config[prop_name] = value

elif catalog_type == "sql":
config = {
"type": "sql",
"uri": os.getenv("DLT_ICEBERG_CATALOG_URI", "sqlite:///:memory:"),
}

else:
raise CatalogNotFoundError(
f"Unsupported catalog type: {catalog_type}. Use 'sql' or 'rest'."
)

# Merge filesystem credentials if provided
if credentials:
fileio_config = _get_fileio_config(credentials)
config.update(fileio_config)

return load_catalog(catalog_name, **config)


def get_catalog(
catalog_name: str = "default",
catalog_uri: Optional[str] = None,
catalog_config: Optional[Dict[str, Any]] = None,
credentials: Optional[FileSystemCredentials] = None,
) -> IcebergCatalog:
"""Get an Iceberg catalog using multiple configuration methods.

This function tries to load a catalog in the following priority order:
1. From explicit config dictionary (if catalog_config provided)
2. From .pyiceberg.yaml file
3. From environment variables
4. Fall back to in-memory SQLite catalog

Args:
catalog_name: Name of the catalog (default: "default")
catalog_type: Type of catalog ('sql' or 'rest') - used for fallback
catalog_uri: URI for SQL catalog - used for fallback
catalog_config: Optional dictionary with complete catalog configuration
credentials: Optional filesystem credentials to merge into config

Returns:
IcebergCatalog instance

Examples:
# Load from .pyiceberg.yaml
catalog = get_catalog('my_catalog')

# Load from config dict
config = {'type': 'rest', 'uri': 'https://...', 'warehouse': 'wh'}
catalog = get_catalog('my_catalog', catalog_config=config)

# Load from environment variables
# (set DLT_ICEBERG_CATALOG_TYPE, DLT_ICEBERG_CATALOG_URI, etc.)
catalog = get_catalog()
"""
logger.info(f"Attempting to load Iceberg catalog: {catalog_name}")

# Priority 1: Explicit config dictionary
if catalog_config:
try:
return load_catalog_from_config(catalog_name, catalog_config, credentials)
except Exception as e:
logger.warning(f"Failed to load catalog from config dict: {e}")

# Priority 2: .pyiceberg.yaml file
try:
return load_catalog_from_yaml(catalog_name, credentials=credentials)
except CatalogNotFoundError as e:
logger.debug(f"Catalog not found in .pyiceberg.yaml: {e}")
except Exception as e:
logger.warning(f"Error loading catalog from .pyiceberg.yaml: {e}")

# Priority 3: Environment variables
try:
return load_catalog_from_env(catalog_name, credentials)
except CatalogNotFoundError as e:
logger.debug(f"Catalog not configured via environment variables: {e}")
except Exception as e:
logger.warning(f"Error loading catalog from environment: {e}")

# Priority 4: Fall back to in-memory SQLite
logger.info("No catalog configuration found, using in-memory SQLite catalog")
uri = catalog_uri or "sqlite:///:memory:"
return get_sql_catalog(catalog_name, uri, credentials)


def evolve_table(
catalog: IcebergCatalog,
client: FilesystemClient,
Expand Down
34 changes: 33 additions & 1 deletion dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses

from typing import Final, Optional, Type
from typing import Final, Optional, Type, Dict, Any

from dlt.common import logger
from dlt.common.configuration import configspec, resolve_type
Expand All @@ -26,6 +26,38 @@ class FilesystemDestinationClientConfiguration(FilesystemConfigurationWithLocalF
always_refresh_views: bool = False
"""Always refresh table scanner views by setting the newest table metadata or globbing table files"""

# Iceberg catalog configuration
iceberg_catalog_name: Optional[str] = "default"
"""Name of the Iceberg catalog to use. Corresponds to catalog name in .pyiceberg.yaml"""

iceberg_catalog_type: Optional[str] = "sql"
"""Type of Iceberg catalog: 'sql', 'rest', 'glue', 'hive', etc."""

iceberg_catalog_uri: Optional[str] = None
"""
URI for SQL catalog (e.g., 'postgresql://...') or REST catalog endpoint.
If not provided, defaults to in-memory SQLite for backward compatibility.
"""

iceberg_catalog_config: Optional[Dict[str, Any]] = None
"""
Optional dictionary with complete catalog configuration.
If provided, will be used instead of loading from .pyiceberg.yaml.
Example for REST catalog:
{
'type': 'rest',
'uri': 'https://catalog.example.com',
'warehouse': 'my_warehouse',
'credential': 'token',
'scope': 'PRINCIPAL_ROLE:ALL'
}
Example for SQL catalog:
{
'type': 'sql',
'uri': 'postgresql://user:pass@localhost/catalog'
}
"""

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
return super().resolve_credentials_type()
Expand Down
47 changes: 39 additions & 8 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ def load_open_table(self, table_format: TTableFormat, table_name: str, **kwargs:
)

def get_open_table_catalog(self, table_format: TTableFormat, catalog_name: str = None) -> Any:
"""Gets a native catalog for a table `table_name` with format `table_format`
"""Gets a native catalog for a table with format `table_format`

Returns: currently pyiceberg Catalog is supported
"""
Expand All @@ -1000,16 +1000,47 @@ def get_open_table_catalog(self, table_format: TTableFormat, catalog_name: str =
if self._catalog:
return self._catalog

from dlt.common.libs.pyiceberg import get_sql_catalog, IcebergCatalog
from dlt.common.libs.pyiceberg import get_catalog, IcebergCatalog

# create in-memory catalog
catalog: IcebergCatalog
catalog = self._catalog = get_sql_catalog(
catalog_name or "default", "sqlite:///:memory:", self.config.credentials
)

# create namespace
catalog.create_namespace(self.dataset_name)
# Use catalog name from config or parameter
catalog_name = catalog_name or self.config.iceberg_catalog_name or "default"

logger.info(f"Loading Iceberg catalog: {catalog_name}")

try:
# Try to load catalog using new function
catalog = self._catalog = get_catalog(
catalog_name=catalog_name,
catalog_type=self.config.iceberg_catalog_type,
catalog_uri=self.config.iceberg_catalog_uri,
catalog_config=self.config.iceberg_catalog_config,
credentials=self.config.credentials,
)

logger.info(
f"Successfully loaded catalog '{catalog_name}' "
f"of type '{self.config.iceberg_catalog_type}'"
)

except Exception as e:
logger.warning(f"Failed to load catalog '{catalog_name}': {e}")
logger.info("Falling back to default in-memory SQLite catalog")

# Fall back to original behavior (in-memory SQLite)
from dlt.common.libs.pyiceberg import get_sql_catalog

catalog = self._catalog = get_sql_catalog(
catalog_name, "sqlite:///:memory:", self.config.credentials
)

# Create namespace
try:
catalog.create_namespace(self.dataset_name)
logger.info(f"Created Iceberg namespace: {self.dataset_name}")
except Exception as e:
logger.debug(f"Namespace {self.dataset_name} already exists or error: {e}")

return catalog

Expand Down