Skip to content

Commit

Permalink
legacy-current-snapshot-id
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy committed Feb 29, 2024
1 parent d401045 commit cd38c5a
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 16 deletions.
4 changes: 4 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,7 @@ catalog:
# Concurrency

PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details.

# Backward Compatibility

Previous versions of Java implementations incorrectly assumes the optional attribute `current-snapshot-id` to be a required attribute in the TableMetadata. Which means that if `current-snapshot-id` is missing in the metadata file (e.g. on table creation), the application will throw an exception without being able to load the table. This assumption has been corrected in more recent Iceberg versions. However, it is possible to force PyIceberg to create table with a metadata file that will be compatible with previous versions. This can be configured by setting the `legacy-current-snapshot-id` entry as "True" in the configuration file, or by setting the `LEGACY_CURRENT_SNAPSHOT_ID` environment variable. Refer to the [PR discussion](https://github.com/apache/iceberg-python/pull/473) for more details on the issue
11 changes: 9 additions & 2 deletions pyiceberg/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import codecs
import gzip
import json
from abc import ABC, abstractmethod
from typing import Callable

from pyiceberg.io import InputFile, InputStream, OutputFile
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
from pyiceberg.table.metadata import CURRENT_SNAPSHOT_ID, TableMetadata, TableMetadataUtil
from pyiceberg.typedef import UTF8
from pyiceberg.utils.config import Config

GZIP = "gzip"

Expand Down Expand Up @@ -127,6 +129,11 @@ def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite:
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
json_bytes = metadata.model_dump_json().encode(UTF8)
model_dump = metadata.model_dump_json()
if Config().get_bool("legacy-current-snapshot-id") and metadata.current_snapshot_id is None:
model_dict = json.loads(model_dump)
model_dict[CURRENT_SNAPSHOT_ID] = -1
model_dump = json.dumps(model_dict)
json_bytes = model_dump.encode(UTF8)
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
output_stream.write(json_bytes)
2 changes: 1 addition & 1 deletion pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def cleanup_snapshot_id(data: Dict[str, Any]) -> Dict[str, Any]:
if CURRENT_SNAPSHOT_ID in data and data[CURRENT_SNAPSHOT_ID] == -1:
# We treat -1 and None the same, by cleaning this up
# in a pre-validator, we can simplify the logic later on
data[CURRENT_SNAPSHOT_ID] = -1
data[CURRENT_SNAPSHOT_ID] = None
return data


Expand Down
11 changes: 1 addition & 10 deletions pyiceberg/utils/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,4 @@ def get_or_create() -> Executor:
@staticmethod
def max_workers() -> Optional[int]:
"""Return the max number of workers configured."""
config = Config()
val = config.config.get("max-workers")

if val is None:
return None

try:
return int(val) # type: ignore
except ValueError as err:
raise ValueError(f"Max workers should be an integer or left unset. Current value: {val}") from err
return Config().get_int("max-workers")
17 changes: 17 additions & 0 deletions pyiceberg/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import logging
import os
from distutils.util import strtobool
from typing import List, Optional

import strictyaml
Expand Down Expand Up @@ -154,3 +155,19 @@ def get_catalog_config(self, catalog_name: str) -> Optional[RecursiveDict]:
assert isinstance(catalog_conf, dict), f"Configuration path catalogs.{catalog_name_lower} needs to be an object"
return catalog_conf
return None

def get_int(self, key: str) -> Optional[int]:
if (val := self.config.get(key)) is not None:
try:
return int(val) # type: ignore
except ValueError as err:
raise ValueError(f"{key} should be an integer or left unset. Current value: {val}") from err
return None

def get_bool(self, key: str) -> Optional[bool]:
if (val := self.config.get(key)) is not None:
try:
return strtobool(val) # type: ignore
except ValueError as err:
raise ValueError(f"{key} should be a boolean or left unset. Current value: {val}") from err
return None
2 changes: 1 addition & 1 deletion tests/cli/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def test_json_describe_table(catalog: InMemoryCatalog) -> None:
assert result.exit_code == 0
assert (
result.output
== """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"current-snapshot-id":-1,"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n"""
== """{"identifier":["default","my_table"],"metadata_location":"s3://warehouse/default/my_table/metadata/metadata.json","metadata":{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}}\n"""
)


Expand Down
4 changes: 2 additions & 2 deletions tests/table/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_v1_metadata_parsing_directly(example_table_metadata_v1: Dict[str, Any])
]
assert table_metadata.default_spec_id == 0
assert table_metadata.last_partition_id == 1000
assert table_metadata.current_snapshot_id == -1
assert table_metadata.current_snapshot_id is None
assert table_metadata.default_sort_order_id == 0


Expand Down Expand Up @@ -170,7 +170,7 @@ def test_updating_metadata(example_table_metadata_v2: Dict[str, Any]) -> None:
def test_serialize_v1(example_table_metadata_v1: Dict[str, Any]) -> None:
table_metadata = TableMetadataV1(**example_table_metadata_v1)
table_metadata_json = table_metadata.model_dump_json()
expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"current-snapshot-id":-1,"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}"""
expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}"""
assert table_metadata_json == expected


Expand Down
50 changes: 50 additions & 0 deletions tests/test_serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import uuid
import importlib
import pytest
import json
from typing import Any, Dict
import os
from pytest_mock import MockFixture

from pyiceberg.utils.config import Config
from pyiceberg.table import StaticTable
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg import serializers
from pyiceberg.serializers import ToOutputFile


def test_legacy_current_snapshot_id(mocker: MockFixture, tmp_path_factory: pytest.TempPathFactory, example_table_metadata_no_snapshot_v1: Dict[str, Any]) -> None:
from pyiceberg.io.pyarrow import PyArrowFileIO

metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json")
metadata = TableMetadataV1(**example_table_metadata_no_snapshot_v1)
ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
static_table = StaticTable.from_metadata(metadata_location)
assert static_table.metadata.current_snapshot_id is None

mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
with PyArrowFileIO().new_input(location=metadata_location).open() as input_stream:
metadata = input_stream.read()
assert json.loads(metadata)['current-snapshot-id'] == -1
backwards_compatible_static_table = StaticTable.from_metadata(metadata_location)
assert backwards_compatible_static_table.metadata.current_snapshot_id is None
assert backwards_compatible_static_table.metadata == static_table.metadata
17 changes: 17 additions & 0 deletions tests/utils/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,20 @@ def test_merge_config() -> None:
rhs: RecursiveDict = {"common_key": "xyz789"}
result = merge_config(lhs, rhs)
assert result["common_key"] == rhs["common_key"]


def test_from_configuration_files_get_typed_value(tmp_path_factory: pytest.TempPathFactory) -> None:
config_path = str(tmp_path_factory.mktemp("config"))
with open(f"{config_path}/.pyiceberg.yaml", "w", encoding=UTF8) as file:
yaml_str = as_document({"max-workers": "4", "legacy-current-snapshot-id": "True"}).as_yaml()
file.write(yaml_str)

os.environ["PYICEBERG_HOME"] = config_path
with pytest.raises(ValueError):
Config().get_bool("max-workers")

with pytest.raises(ValueError):
Config().get_int("legacy-current-snapshot-id")

assert Config().get_bool("legacy-current-snapshot-id")
assert Config().get_int("max-workers") == 4

0 comments on commit cd38c5a

Please sign in to comment.