From 141fe53215de99b8331e0ac585a90eeda4f8243f Mon Sep 17 00:00:00 2001 From: lfagliano Date: Sun, 16 Nov 2025 20:05:16 +0100 Subject: [PATCH 1/2] feat: Adding retrieving catalog functions for pyiceberg lib --- dlt/common/libs/pyiceberg.py | 269 ++++++++++++++++++ .../impl/filesystem/filesystem.py | 47 ++- 2 files changed, 308 insertions(+), 8 deletions(-) diff --git a/dlt/common/libs/pyiceberg.py b/dlt/common/libs/pyiceberg.py index d216d4c835..f5323915d3 100644 --- a/dlt/common/libs/pyiceberg.py +++ b/dlt/common/libs/pyiceberg.py @@ -1,5 +1,7 @@ import os +import yaml from typing import Dict, Any, List, Optional +from pathlib import Path from fsspec import AbstractFileSystem from packaging.version import Version @@ -142,6 +144,273 @@ def get_sql_catalog( ) +class CatalogNotFoundError(Exception): + """Raised when a catalog cannot be found in the specified configuration method""" + + pass + + +def load_catalog_from_yaml( + catalog_name: str, + config_path: Optional[str] = None, + credentials: Optional[FileSystemCredentials] = None, +) -> IcebergCatalog: + """Load Iceberg catalog from .pyiceberg.yaml file + + Args: + catalog_name: Name of the catalog to load from YAML + config_path: Optional path to .pyiceberg.yaml file. If None, searches in: + 1. Current directory (./.pyiceberg.yaml) + 2. DLT project directory (./.dlt/.pyiceberg.yaml) + 3. Home directory (~/.pyiceberg.yaml) + credentials: Optional filesystem credentials to merge into catalog config + + Returns: + IcebergCatalog instance loaded from YAML configuration + + Raises: + CatalogNotFoundError: If no .pyiceberg.yaml file found or catalog not in file + + Example .pyiceberg.yaml: + catalog: + my_catalog: + type: rest + uri: https://catalog.example.com + warehouse: my_warehouse + credential: token + """ + from pyiceberg.catalog import load_catalog + + search_paths = [] + if config_path: + search_paths.append(Path(config_path)) + else: + # Search in standard locations + search_paths.extend( + [ + Path.cwd() / ".pyiceberg.yaml", + Path.cwd() / ".dlt" / ".pyiceberg.yaml", + Path.home() / ".pyiceberg.yaml", + ] + ) + + # Search for the first existing config file + config = None + for path in search_paths: + if path.exists(): + logger.info(f"Loading Iceberg catalog configuration from: {path}") + with open(path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + break + + # If no config file was found, raise error + if config is None: + raise CatalogNotFoundError( + f"No .pyiceberg.yaml file found. Searched in: {', '.join(str(p) for p in search_paths)}" + ) + + # Check if catalog exists in config + if "catalog" not in config or catalog_name not in config["catalog"]: + available = list(config.get("catalog", {}).keys()) + raise CatalogNotFoundError( + f"Catalog '{catalog_name}' not found in .pyiceberg.yaml. " + f"Available catalogs: {available}" + ) + + logger.info(f"Found catalog '{catalog_name}' in .pyiceberg.yaml") + + # Get catalog config and merge credentials if provided + catalog_config = config["catalog"][catalog_name].copy() + if credentials: + fileio_config = _get_fileio_config(credentials) + catalog_config.update(fileio_config) + + return load_catalog(catalog_name, **catalog_config) + + +def load_catalog_from_config( + catalog_name: str, + config_dict: Dict[str, Any], + credentials: Optional[FileSystemCredentials] = None, +) -> IcebergCatalog: + """Load Iceberg catalog from configuration dictionary + + Args: + catalog_name: Name of the catalog + config_dict: Dictionary with catalog configuration (type, uri, warehouse, etc.) + credentials: Optional filesystem credentials to merge into config + + Returns: + IcebergCatalog instance + + Raises: + CatalogNotFoundError: If config_dict is None or empty + + Example: + config = { + 'type': 'rest', + 'uri': 'https://catalog.example.com', + 'warehouse': 'my_warehouse', + 'credential': 'token' + } + catalog = load_catalog_from_config('my_catalog', config) + """ + from pyiceberg.catalog import load_catalog + + if not config_dict: + raise CatalogNotFoundError("No configuration dictionary provided") + + logger.info(f"Loading catalog '{catalog_name}' from provided configuration") + + # Merge filesystem credentials if provided + config_dict = config_dict.copy() + if credentials: + fileio_config = _get_fileio_config(credentials) + config_dict.update(fileio_config) + + return load_catalog(catalog_name, **config_dict) + + +def load_catalog_from_env( + catalog_name: Optional[str] = None, + credentials: Optional[FileSystemCredentials] = None, +) -> IcebergCatalog: + """Load Iceberg catalog from environment variables + + Args: + catalog_name: Optional catalog name (defaults to DLT_ICEBERG_CATALOG_NAME env var) + credentials: Optional filesystem credentials to merge into config + + Returns: + IcebergCatalog instance + + Raises: + CatalogNotFoundError: If required environment variables are not set + + Environment variables: + DLT_ICEBERG_CATALOG_NAME: Name of the catalog (optional, defaults to 'default') + DLT_ICEBERG_CATALOG_TYPE: Type of catalog ('sql' or 'rest') + DLT_ICEBERG_CATALOG_URI: Catalog URI + DLT_ICEBERG_CATALOG_WAREHOUSE: Warehouse name (for REST catalogs) + DLT_ICEBERG_CATALOG_PROP_*: Additional properties (e.g., PROP_CREDENTIAL, PROP_SCOPE) + """ + from pyiceberg.catalog import load_catalog + + catalog_name = catalog_name or os.getenv("DLT_ICEBERG_CATALOG_NAME", "default") + catalog_type = os.getenv("DLT_ICEBERG_CATALOG_TYPE") + + if not catalog_type: + raise CatalogNotFoundError( + "DLT_ICEBERG_CATALOG_TYPE environment variable not set. Set to 'sql' or 'rest'." + ) + + logger.info(f"Loading catalog '{catalog_name}' from environment variables") + + if catalog_type == "rest": + config = { + "type": "rest", + "uri": os.getenv("DLT_ICEBERG_CATALOG_URI"), + "warehouse": os.getenv("DLT_ICEBERG_CATALOG_WAREHOUSE"), + } + + if not config["uri"] or not config["warehouse"]: + raise CatalogNotFoundError( + "For REST catalog, DLT_ICEBERG_CATALOG_URI and " + "DLT_ICEBERG_CATALOG_WAREHOUSE must be set" + ) + + # Add custom properties from DLT_ICEBERG_CATALOG_PROP_* variables + for key, value in os.environ.items(): + if key.startswith("DLT_ICEBERG_CATALOG_PROP_"): + prop_name = key.replace("DLT_ICEBERG_CATALOG_PROP_", "").lower().replace("_", "-") + config[prop_name] = value + + elif catalog_type == "sql": + config = { + "type": "sql", + "uri": os.getenv("DLT_ICEBERG_CATALOG_URI", "sqlite:///:memory:"), + } + + else: + raise CatalogNotFoundError( + f"Unsupported catalog type: {catalog_type}. Use 'sql' or 'rest'." + ) + + # Merge filesystem credentials if provided + if credentials: + fileio_config = _get_fileio_config(credentials) + config.update(fileio_config) + + return load_catalog(catalog_name, **config) + + +def get_catalog( + catalog_name: str = "default", + catalog_uri: Optional[str] = None, + catalog_config: Optional[Dict[str, Any]] = None, + credentials: Optional[FileSystemCredentials] = None, +) -> IcebergCatalog: + """Get an Iceberg catalog using multiple configuration methods. + + This function tries to load a catalog in the following priority order: + 1. From explicit config dictionary (if catalog_config provided) + 2. From .pyiceberg.yaml file + 3. From environment variables + 4. Fall back to in-memory SQLite catalog + + Args: + catalog_name: Name of the catalog (default: "default") + catalog_type: Type of catalog ('sql' or 'rest') - used for fallback + catalog_uri: URI for SQL catalog - used for fallback + catalog_config: Optional dictionary with complete catalog configuration + credentials: Optional filesystem credentials to merge into config + + Returns: + IcebergCatalog instance + + Examples: + # Load from .pyiceberg.yaml + catalog = get_catalog('my_catalog') + + # Load from config dict + config = {'type': 'rest', 'uri': 'https://...', 'warehouse': 'wh'} + catalog = get_catalog('my_catalog', catalog_config=config) + + # Load from environment variables + # (set DLT_ICEBERG_CATALOG_TYPE, DLT_ICEBERG_CATALOG_URI, etc.) + catalog = get_catalog() + """ + logger.info(f"Attempting to load Iceberg catalog: {catalog_name}") + + # Priority 1: Explicit config dictionary + if catalog_config: + try: + return load_catalog_from_config(catalog_name, catalog_config, credentials) + except Exception as e: + logger.warning(f"Failed to load catalog from config dict: {e}") + + # Priority 2: .pyiceberg.yaml file + try: + return load_catalog_from_yaml(catalog_name, credentials=credentials) + except CatalogNotFoundError as e: + logger.debug(f"Catalog not found in .pyiceberg.yaml: {e}") + except Exception as e: + logger.warning(f"Error loading catalog from .pyiceberg.yaml: {e}") + + # Priority 3: Environment variables + try: + return load_catalog_from_env(catalog_name, credentials) + except CatalogNotFoundError as e: + logger.debug(f"Catalog not configured via environment variables: {e}") + except Exception as e: + logger.warning(f"Error loading catalog from environment: {e}") + + # Priority 4: Fall back to in-memory SQLite + logger.info("No catalog configuration found, using in-memory SQLite catalog") + uri = catalog_uri or "sqlite:///:memory:" + return get_sql_catalog(catalog_name, uri, credentials) + + def evolve_table( catalog: IcebergCatalog, client: FilesystemClient, diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index cbf062a094..25f570aa40 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -990,7 +990,7 @@ def load_open_table(self, table_format: TTableFormat, table_name: str, **kwargs: ) def get_open_table_catalog(self, table_format: TTableFormat, catalog_name: str = None) -> Any: - """Gets a native catalog for a table `table_name` with format `table_format` + """Gets a native catalog for a table with format `table_format` Returns: currently pyiceberg Catalog is supported """ @@ -1000,16 +1000,47 @@ def get_open_table_catalog(self, table_format: TTableFormat, catalog_name: str = if self._catalog: return self._catalog - from dlt.common.libs.pyiceberg import get_sql_catalog, IcebergCatalog + from dlt.common.libs.pyiceberg import get_catalog, IcebergCatalog - # create in-memory catalog catalog: IcebergCatalog - catalog = self._catalog = get_sql_catalog( - catalog_name or "default", "sqlite:///:memory:", self.config.credentials - ) - # create namespace - catalog.create_namespace(self.dataset_name) + # Use catalog name from config or parameter + catalog_name = catalog_name or self.config.iceberg_catalog_name or "default" + + logger.info(f"Loading Iceberg catalog: {catalog_name}") + + try: + # Try to load catalog using new function + catalog = self._catalog = get_catalog( + catalog_name=catalog_name, + catalog_type=self.config.iceberg_catalog_type, + catalog_uri=self.config.iceberg_catalog_uri, + catalog_config=self.config.iceberg_catalog_config, + credentials=self.config.credentials, + ) + + logger.info( + f"Successfully loaded catalog '{catalog_name}' " + f"of type '{self.config.iceberg_catalog_type}'" + ) + + except Exception as e: + logger.warning(f"Failed to load catalog '{catalog_name}': {e}") + logger.info("Falling back to default in-memory SQLite catalog") + + # Fall back to original behavior (in-memory SQLite) + from dlt.common.libs.pyiceberg import get_sql_catalog + + catalog = self._catalog = get_sql_catalog( + catalog_name, "sqlite:///:memory:", self.config.credentials + ) + + # Create namespace + try: + catalog.create_namespace(self.dataset_name) + logger.info(f"Created Iceberg namespace: {self.dataset_name}") + except Exception as e: + logger.debug(f"Namespace {self.dataset_name} already exists or error: {e}") return catalog From 130f6d68f23550816f303699dc23bede3fa69ced Mon Sep 17 00:00:00 2001 From: lfagliano Date: Sun, 16 Nov 2025 20:05:38 +0100 Subject: [PATCH 2/2] feat: adding iceberg related config to the filesystem config --- .../impl/filesystem/configuration.py | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/dlt/destinations/impl/filesystem/configuration.py b/dlt/destinations/impl/filesystem/configuration.py index ad6e71668d..f59ed8e562 100644 --- a/dlt/destinations/impl/filesystem/configuration.py +++ b/dlt/destinations/impl/filesystem/configuration.py @@ -1,6 +1,6 @@ import dataclasses -from typing import Final, Optional, Type +from typing import Final, Optional, Type, Dict, Any from dlt.common import logger from dlt.common.configuration import configspec, resolve_type @@ -26,6 +26,38 @@ class FilesystemDestinationClientConfiguration(FilesystemConfigurationWithLocalF always_refresh_views: bool = False """Always refresh table scanner views by setting the newest table metadata or globbing table files""" + # Iceberg catalog configuration + iceberg_catalog_name: Optional[str] = "default" + """Name of the Iceberg catalog to use. Corresponds to catalog name in .pyiceberg.yaml""" + + iceberg_catalog_type: Optional[str] = "sql" + """Type of Iceberg catalog: 'sql', 'rest', 'glue', 'hive', etc.""" + + iceberg_catalog_uri: Optional[str] = None + """ + URI for SQL catalog (e.g., 'postgresql://...') or REST catalog endpoint. + If not provided, defaults to in-memory SQLite for backward compatibility. + """ + + iceberg_catalog_config: Optional[Dict[str, Any]] = None + """ + Optional dictionary with complete catalog configuration. + If provided, will be used instead of loading from .pyiceberg.yaml. + Example for REST catalog: + { + 'type': 'rest', + 'uri': 'https://catalog.example.com', + 'warehouse': 'my_warehouse', + 'credential': 'token', + 'scope': 'PRINCIPAL_ROLE:ALL' + } + Example for SQL catalog: + { + 'type': 'sql', + 'uri': 'postgresql://user:pass@localhost/catalog' + } + """ + @resolve_type("credentials") def resolve_credentials_type(self) -> Type[CredentialsConfiguration]: return super().resolve_credentials_type()