diff --git a/pyiceberg/avro/codecs/__init__.py b/pyiceberg/avro/codecs/__init__.py index 22e2f71cf8..ce592ccc5a 100644 --- a/pyiceberg/avro/codecs/__init__.py +++ b/pyiceberg/avro/codecs/__init__.py @@ -26,7 +26,9 @@ from __future__ import annotations -from typing import Dict, Optional, Type +from typing import Dict, Literal, Optional, Type + +from typing_extensions import TypeAlias from pyiceberg.avro.codecs.bzip2 import BZip2Codec from pyiceberg.avro.codecs.codec import Codec @@ -34,10 +36,17 @@ from pyiceberg.avro.codecs.snappy_codec import SnappyCodec from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec -KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = { +AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"] + +AVRO_CODEC_KEY = "avro.codec" + +KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = { "null": None, "bzip2": BZip2Codec, "snappy": SnappyCodec, "zstandard": ZStandardCodec, "deflate": DeflateCodec, } + +# Map to convert the naming from Iceberg to Avro +CODEC_MAPPING_ICEBERG_TO_AVRO: Dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"} diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index 9db585308d..82b042a412 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -35,7 +35,7 @@ TypeVar, ) -from pyiceberg.avro.codecs import KNOWN_CODECS +from pyiceberg.avro.codecs import AVRO_CODEC_KEY, CODEC_MAPPING_ICEBERG_TO_AVRO, KNOWN_CODECS from pyiceberg.avro.codecs.codec import Codec from pyiceberg.avro.decoder import BinaryDecoder, new_decoder from pyiceberg.avro.encoder import BinaryEncoder @@ -69,7 +69,6 @@ NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True), ) -_CODEC_KEY = "avro.codec" _SCHEMA_KEY = "avro.schema" @@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]: In the case of a null codec, we return a None indicating that we don't need to compress/decompress. """ - codec_name = self.meta.get(_CODEC_KEY, "null") + from pyiceberg.table import TableProperties + + codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) if codec_name not in KNOWN_CODECS: raise ValueError(f"Unsupported codec: {codec_name}") - return KNOWN_CODECS[codec_name] + return KNOWN_CODECS[codec_name] # type: ignore def get_schema(self) -> Schema: if _SCHEMA_KEY in self.meta: @@ -276,11 +277,36 @@ def __exit__( self.output_stream.close() def _write_header(self) -> None: + from pyiceberg.table import TableProperties + + codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) + if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name): + codec_name = avro_codec_name + json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name)) - meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"} + + meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec_name} header = AvroFileHeader(MAGIC, meta, self.sync_bytes) construct_writer(META_SCHEMA).write(self.encoder, header) + def compression_codec(self) -> Optional[Type[Codec]]: + """Get the file's compression codec algorithm from the file's metadata. + + In the case of a null codec, we return a None indicating that we + don't need to compress/decompress. + """ + from pyiceberg.table import TableProperties + + codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT) + + if avro_codec_name := CODEC_MAPPING_ICEBERG_TO_AVRO.get(codec_name): + codec_name = avro_codec_name + + if codec_name not in KNOWN_CODECS: + raise ValueError(f"Unsupported codec: {codec_name}") + + return KNOWN_CODECS[codec_name] # type: ignore + def write_block(self, objects: List[D]) -> None: in_memory = io.BytesIO() block_content_encoder = BinaryEncoder(output_stream=in_memory) @@ -289,6 +315,13 @@ def write_block(self, objects: List[D]) -> None: block_content = in_memory.getvalue() self.encoder.write_int(len(objects)) - self.encoder.write_int(len(block_content)) - self.encoder.write(block_content) + + if codec := self.compression_codec(): + content, content_length = codec.compress(block_content) + self.encoder.write_int(content_length) + self.encoder.write(content) + else: + self.encoder.write_int(len(block_content)) + self.encoder.write(block_content) + self.encoder.write(self.sync_bytes) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 61cb87e3d8..8df058bd14 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -37,6 +37,7 @@ from cachetools.keys import hashkey from pydantic_core import to_json +from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec from pyiceberg.avro.file import AvroFile, AvroOutputFile from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ValidationError @@ -798,9 +799,16 @@ class ManifestWriter(ABC): _deleted_rows: int _min_sequence_number: Optional[int] _partitions: List[Record] - _reused_entry_wrapper: ManifestEntry + _compression: AvroCompressionCodec - def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None: + def __init__( + self, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, + ) -> None: self.closed = False self._spec = spec self._schema = schema @@ -815,6 +823,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, self._deleted_rows = 0 self._min_sequence_number = None self._partitions = [] + self._compression = avro_compression def __enter__(self) -> ManifestWriter: """Open the writer.""" @@ -850,6 +859,7 @@ def _meta(self) -> Dict[str, str]: "partition-spec": to_json(self._spec.fields).decode("utf-8"), "partition-spec-id": str(self._spec.spec_id), "format-version": str(self.version), + AVRO_CODEC_KEY: self._compression, } def _with_partition(self, format_version: TableVersion) -> Schema: @@ -961,13 +971,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: class ManifestWriterV1(ManifestWriter): - def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): - super().__init__( - spec, - schema, - output_file, - snapshot_id, - ) + def __init__( + self, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, + ): + super().__init__(spec, schema, output_file, snapshot_id, avro_compression) def content(self) -> ManifestContent: return ManifestContent.DATA @@ -981,8 +993,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: class ManifestWriterV2(ManifestWriter): - def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): - super().__init__(spec, schema, output_file, snapshot_id) + def __init__( + self, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, + ): + super().__init__(spec, schema, output_file, snapshot_id, avro_compression) def content(self) -> ManifestContent: return ManifestContent.DATA @@ -1008,12 +1027,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: def write_manifest( - format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int + format_version: TableVersion, + spec: PartitionSpec, + schema: Schema, + output_file: OutputFile, + snapshot_id: int, + avro_compression: AvroCompressionCodec, ) -> ManifestWriter: if format_version == 1: - return ManifestWriterV1(spec, schema, output_file, snapshot_id) + return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression) elif format_version == 2: - return ManifestWriterV2(spec, schema, output_file, snapshot_id) + return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") @@ -1063,7 +1087,13 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite class ManifestListWriterV1(ManifestListWriter): - def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]): + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: Optional[int], + compression: AvroCompressionCodec, + ): super().__init__( format_version=1, output_file=output_file, @@ -1071,6 +1101,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id "snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", "format-version": "1", + AVRO_CODEC_KEY: compression, }, ) @@ -1084,7 +1115,14 @@ class ManifestListWriterV2(ManifestListWriter): _commit_snapshot_id: int _sequence_number: int - def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): + def __init__( + self, + output_file: OutputFile, + snapshot_id: int, + parent_snapshot_id: Optional[int], + sequence_number: int, + compression: AvroCompressionCodec, + ): super().__init__( format_version=2, output_file=output_file, @@ -1093,6 +1131,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", "sequence-number": str(sequence_number), "format-version": "2", + AVRO_CODEC_KEY: compression, }, ) self._commit_snapshot_id = snapshot_id @@ -1127,12 +1166,13 @@ def write_manifest_list( snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: Optional[int], + avro_compression: AvroCompressionCodec, ) -> ManifestListWriter: if format_version == 1: - return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id) + return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression) elif format_version == 2: if sequence_number is None: raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}") - return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number) + return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 78676a774a..74a9c881c6 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -192,6 +192,9 @@ class TableProperties: WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes" WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB + WRITE_AVRO_COMPRESSION = "write.avro.compression-codec" + WRITE_AVRO_COMPRESSION_DEFAULT = "gzip" + DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default" DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)" diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index a82167744d..08c4f5d0bf 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -27,6 +27,7 @@ from sortedcontainers import SortedList +from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.expressions import ( AlwaysFalse, BooleanExpression, @@ -104,6 +105,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): _added_data_files: List[DataFile] _manifest_num_counter: itertools.count[int] _deleted_data_files: Set[DataFile] + _compression: AvroCompressionCodec def __init__( self, @@ -126,6 +128,11 @@ def __init__( self._deleted_data_files = set() self.snapshot_properties = snapshot_properties self._manifest_num_counter = itertools.count(0) + from pyiceberg.table import TableProperties + + self._compression = self._transaction.table_metadata.properties.get( # type: ignore + TableProperties.WRITE_AVRO_COMPRESSION, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT + ) def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._added_data_files.append(data_file) @@ -154,6 +161,7 @@ def _write_added_manifest() -> List[ManifestFile]: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: for data_file in self._added_data_files: writer.add( @@ -184,6 +192,7 @@ def _write_delete_manifest() -> List[ManifestFile]: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: for entry in entries: writer.add_entry(entry) @@ -249,12 +258,14 @@ def _commit(self) -> UpdatesAndRequirements: ) location_provider = self._transaction._table.location_provider() manifest_list_file_path = location_provider.new_metadata_location(file_name) + with write_manifest_list( format_version=self._transaction.table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, sequence_number=next_sequence_number, + avro_compression=self._compression, ) as writer: writer.add_manifests(new_manifests) @@ -291,6 +302,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) def new_manifest_output(self) -> OutputFile: @@ -416,6 +428,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: for existing_entry in existing_entries: writer.add_entry(existing_entry) @@ -550,6 +563,7 @@ def _existing_manifests(self) -> List[ManifestFile]: schema=self._transaction.table_metadata.schema(), output_file=self.new_manifest_output(), snapshot_id=self._snapshot_id, + avro_compression=self._compression, ) as writer: [ writer.add_entry( diff --git a/tests/integration/test_rest_manifest.py b/tests/integration/test_rest_manifest.py index dda0bbfe3b..8dd9510ac8 100644 --- a/tests/integration/test_rest_manifest.py +++ b/tests/integration/test_rest_manifest.py @@ -25,6 +25,7 @@ import pytest from fastavro import reader +from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import DataFile, write_manifest @@ -77,7 +78,8 @@ def table_test_all_types(catalog: Catalog) -> Table: @pytest.mark.integration -def test_write_sample_manifest(table_test_all_types: Table) -> None: +@pytest.mark.parametrize("compression", ["null", "deflate"]) +def test_write_sample_manifest(table_test_all_types: Table, compression: AvroCompressionCodec) -> None: test_snapshot = table_test_all_types.current_snapshot() if test_snapshot is None: raise ValueError("Table has no current snapshot, check the docker environment") @@ -120,6 +122,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None: schema=test_schema, output_file=output, snapshot_id=test_snapshot.snapshot_id, + avro_compression=compression, ) as manifest_writer: # For simplicity, try one entry first manifest_writer.add_entry(test_manifest_entries[0]) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 150d2b750c..1e0f1084ca 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -25,6 +25,7 @@ from typing import Any, Dict from urllib.parse import urlparse +import fastavro import pandas as pd import pandas.testing import pyarrow as pa @@ -1841,3 +1842,22 @@ def test_read_write_decimals(session_catalog: Catalog) -> None: tbl.append(arrow_table) assert tbl.scan().to_arrow() == arrow_table + + +@pytest.mark.integration +def test_avro_compression_codecs(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_avro_compression_codecs" + tbl = _create_table(session_catalog, identifier, schema=arrow_table_with_null.schema, data=[arrow_table_with_null]) + + with tbl.io.new_input(tbl.current_snapshot().manifest_list).open() as f: + reader = fastavro.reader(f) + assert reader.codec == "deflate" + + with tbl.transaction() as tx: + tx.set_properties(**{TableProperties.WRITE_AVRO_COMPRESSION: "null"}) + + tbl.append(arrow_table_with_null) + + with tbl.io.new_input(tbl.current_snapshot().manifest_list).open() as f: + reader = fastavro.reader(f) + assert reader.codec == "null" diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 5740587958..d92f87a464 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -22,6 +22,7 @@ import fastavro import pytest +from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.io import load_file_io from pyiceberg.io.pyarrow import PyArrowFileIO from pyiceberg.manifest import ( @@ -351,13 +352,18 @@ def test_write_empty_manifest() -> None: schema=test_schema, output_file=io.new_output(tmp_avro_file), snapshot_id=8744736658442914487, + avro_compression="deflate", ) as _: pass @pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("compression", ["null", "deflate"]) def test_write_manifest( - generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion + generated_manifest_file_file_v1: str, + generated_manifest_file_file_v2: str, + format_version: TableVersion, + compression: AvroCompressionCodec, ) -> None: io = load_file_io() snapshot = Snapshot( @@ -387,6 +393,7 @@ def test_write_manifest( schema=test_schema, output_file=output, snapshot_id=8744736658442914487, + avro_compression=compression, ) as writer: for entry in manifest_entries: writer.add_entry(entry) @@ -527,11 +534,13 @@ def test_write_manifest( @pytest.mark.parametrize("format_version", [1, 2]) @pytest.mark.parametrize("parent_snapshot_id", [19, None]) +@pytest.mark.parametrize("compression", ["null", "deflate"]) def test_write_manifest_list( generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion, parent_snapshot_id: Optional[int], + compression: AvroCompressionCodec, ) -> None: io = load_file_io() @@ -554,6 +563,7 @@ def test_write_manifest_list( snapshot_id=25, parent_snapshot_id=parent_snapshot_id, sequence_number=0, + avro_compression=compression, ) as writer: writer.add_manifests(demo_manifest_list) new_manifest_list = list(read_manifest_list(io.new_input(path)))