Skip to content

Add Avro compression #1976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pyiceberg/avro/codecs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@

from __future__ import annotations

from typing import Dict, Optional, Type
from typing import Dict, Literal, Optional, Type

from typing_extensions import TypeAlias

from pyiceberg.avro.codecs.bzip2 import BZip2Codec
from pyiceberg.avro.codecs.codec import Codec
from pyiceberg.avro.codecs.deflate import DeflateCodec
from pyiceberg.avro.codecs.snappy_codec import SnappyCodec
from pyiceberg.avro.codecs.zstandard_codec import ZStandardCodec

KNOWN_CODECS: Dict[str, Optional[Type[Codec]]] = {
AvroCompressionCodec: TypeAlias = Literal["null", "bzip2", "snappy", "zstandard", "deflate"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places in the project we use zstd instead of zstandard. What do you think about using zstd?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ndrluis that's a great catch! We should use zstd instead. Avro uses zstandard, that's why I copied it here, but it is actually wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep in min that this TypeAlias is called AvroCompressionCodec, so it should actually use zstandard instead of zstd, but I've fixed it for Iceberg. LMKWYT


AVRO_CODEC_KEY = "avro.codec"

KNOWN_CODECS: Dict[AvroCompressionCodec, Optional[Type[Codec]]] = {
"null": None,
"bzip2": BZip2Codec,
"snappy": SnappyCodec,
Expand Down
47 changes: 40 additions & 7 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
TypeVar,
)

from pyiceberg.avro.codecs import KNOWN_CODECS
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, KNOWN_CODECS
from pyiceberg.avro.codecs.codec import Codec
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
from pyiceberg.avro.encoder import BinaryEncoder
Expand Down Expand Up @@ -69,7 +69,6 @@
NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), required=True),
)

_CODEC_KEY = "avro.codec"
_SCHEMA_KEY = "avro.schema"


Expand All @@ -92,11 +91,13 @@ def compression_codec(self) -> Optional[Type[Codec]]:
In the case of a null codec, we return a None indicating that we
don't need to compress/decompress.
"""
codec_name = self.meta.get(_CODEC_KEY, "null")
from pyiceberg.table import TableProperties

codec_name = self.meta.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
if codec_name not in KNOWN_CODECS:
raise ValueError(f"Unsupported codec: {codec_name}")

return KNOWN_CODECS[codec_name]
return KNOWN_CODECS[codec_name] # type: ignore

def get_schema(self) -> Schema:
if _SCHEMA_KEY in self.meta:
Expand Down Expand Up @@ -276,11 +277,36 @@ def __exit__(
self.output_stream.close()

def _write_header(self) -> None:
from pyiceberg.table import TableProperties

codec = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)
if codec == "gzip":
codec = "deflate"

json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name))
meta = {**self.metadata, _SCHEMA_KEY: json_schema, _CODEC_KEY: "null"}

meta = {**self.metadata, _SCHEMA_KEY: json_schema, AVRO_CODEC_KEY: codec}
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
construct_writer(META_SCHEMA).write(self.encoder, header)

def compression_codec(self) -> Optional[Type[Codec]]:
"""Get the file's compression codec algorithm from the file's metadata.

In the case of a null codec, we return a None indicating that we
don't need to compress/decompress.
"""
from pyiceberg.table import TableProperties

codec_name = self.metadata.get(AVRO_CODEC_KEY, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT)

if codec_name == "gzip":
codec_name = "deflate"

if codec_name not in KNOWN_CODECS:
raise ValueError(f"Unsupported codec: {codec_name}")

return KNOWN_CODECS[codec_name] # type: ignore

def write_block(self, objects: List[D]) -> None:
in_memory = io.BytesIO()
block_content_encoder = BinaryEncoder(output_stream=in_memory)
Expand All @@ -289,6 +315,13 @@ def write_block(self, objects: List[D]) -> None:
block_content = in_memory.getvalue()

self.encoder.write_int(len(objects))
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)

if codec := self.compression_codec():
content, content_length = codec.compress(block_content)
self.encoder.write_int(content_length)
self.encoder.write(content)
else:
self.encoder.write_int(len(block_content))
self.encoder.write(block_content)

self.encoder.write(self.sync_bytes)
76 changes: 58 additions & 18 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from cachetools.keys import hashkey
from pydantic_core import to_json

from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
from pyiceberg.avro.file import AvroFile, AvroOutputFile
from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ValidationError
Expand Down Expand Up @@ -798,9 +799,16 @@ class ManifestWriter(ABC):
_deleted_rows: int
_min_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry
_compression: AvroCompressionCodec

def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
def __init__(
self,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
) -> None:
self.closed = False
self._spec = spec
self._schema = schema
Expand All @@ -815,6 +823,7 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
self._deleted_rows = 0
self._min_sequence_number = None
self._partitions = []
self._compression = avro_compression

def __enter__(self) -> ManifestWriter:
"""Open the writer."""
Expand Down Expand Up @@ -850,6 +859,7 @@ def _meta(self) -> Dict[str, str]:
"partition-spec": to_json(self._spec.fields).decode("utf-8"),
"partition-spec-id": str(self._spec.spec_id),
"format-version": str(self.version),
AVRO_CODEC_KEY: self._compression,
}

def _with_partition(self, format_version: TableVersion) -> Schema:
Expand Down Expand Up @@ -961,13 +971,15 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:


class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
super().__init__(
spec,
schema,
output_file,
snapshot_id,
)
def __init__(
self,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)

def content(self) -> ManifestContent:
return ManifestContent.DATA
Expand All @@ -981,8 +993,15 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:


class ManifestWriterV2(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
super().__init__(spec, schema, output_file, snapshot_id)
def __init__(
self,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
):
super().__init__(spec, schema, output_file, snapshot_id, avro_compression)

def content(self) -> ManifestContent:
return ManifestContent.DATA
Expand All @@ -1008,12 +1027,17 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:


def write_manifest(
format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int
format_version: TableVersion,
spec: PartitionSpec,
schema: Schema,
output_file: OutputFile,
snapshot_id: int,
avro_compression: AvroCompressionCodec,
) -> ManifestWriter:
if format_version == 1:
return ManifestWriterV1(spec, schema, output_file, snapshot_id)
return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id)
return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")

Expand Down Expand Up @@ -1063,14 +1087,21 @@ def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWrite


class ManifestListWriterV1(ManifestListWriter):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
def __init__(
self,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: Optional[int],
compression: AvroCompressionCodec,
):
super().__init__(
format_version=1,
output_file=output_file,
meta={
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"format-version": "1",
AVRO_CODEC_KEY: compression,
},
)

Expand All @@ -1084,7 +1115,14 @@ class ManifestListWriterV2(ManifestListWriter):
_commit_snapshot_id: int
_sequence_number: int

def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
def __init__(
self,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: Optional[int],
sequence_number: int,
compression: AvroCompressionCodec,
):
super().__init__(
format_version=2,
output_file=output_file,
Expand All @@ -1093,6 +1131,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"format-version": "2",
AVRO_CODEC_KEY: compression,
},
)
self._commit_snapshot_id = snapshot_id
Expand Down Expand Up @@ -1127,12 +1166,13 @@ def write_manifest_list(
snapshot_id: int,
parent_snapshot_id: Optional[int],
sequence_number: Optional[int],
avro_compression: AvroCompressionCodec,
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id)
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
elif format_version == 2:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")
3 changes: 3 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ class TableProperties:
WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = 512 * 1024 * 1024 # 512 MB

WRITE_AVRO_COMPRESSION = "write.avro.compression-codec"
WRITE_AVRO_COMPRESSION_DEFAULT = "gzip"

DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"

Expand Down
14 changes: 14 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from sortedcontainers import SortedList

from pyiceberg.avro.codecs import AvroCompressionCodec
from pyiceberg.expressions import (
AlwaysFalse,
BooleanExpression,
Expand Down Expand Up @@ -104,6 +105,7 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
_added_data_files: List[DataFile]
_manifest_num_counter: itertools.count[int]
_deleted_data_files: Set[DataFile]
_compression: AvroCompressionCodec

def __init__(
self,
Expand All @@ -126,6 +128,11 @@ def __init__(
self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
self._manifest_num_counter = itertools.count(0)
from pyiceberg.table import TableProperties

self._compression = self._transaction.table_metadata.properties.get( # type: ignore
TableProperties.WRITE_AVRO_COMPRESSION, TableProperties.WRITE_AVRO_COMPRESSION_DEFAULT
)
Comment on lines +133 to +135
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think little things like table property parsing has gotten us unexpectedly in the past - would it be possible to add a few cases that demonstrate:

  • that the newly written manifests now have the new default compression when a new snapshot is committed?
  • and that newly written manifests respect a compression codec value when the property is set, when a new snapshot is committed?

I think this would help us add coverage for the new parameter both in property parsing and the simple logic here in update.snapshots module in each functions


def append_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
self._added_data_files.append(data_file)
Expand Down Expand Up @@ -154,6 +161,7 @@ def _write_added_manifest() -> List[ManifestFile]:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
for data_file in self._added_data_files:
writer.add(
Expand Down Expand Up @@ -184,6 +192,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
for entry in entries:
writer.add_entry(entry)
Expand Down Expand Up @@ -249,12 +258,14 @@ def _commit(self) -> UpdatesAndRequirements:
)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)

with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id,
sequence_number=next_sequence_number,
avro_compression=self._compression,
) as writer:
writer.add_manifests(new_manifests)

Expand Down Expand Up @@ -291,6 +302,7 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
)

def new_manifest_output(self) -> OutputFile:
Expand Down Expand Up @@ -416,6 +428,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
for existing_entry in existing_entries:
writer.add_entry(existing_entry)
Expand Down Expand Up @@ -550,6 +563,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
avro_compression=self._compression,
) as writer:
[
writer.add_entry(
Expand Down
5 changes: 4 additions & 1 deletion tests/integration/test_rest_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pytest
from fastavro import reader

from pyiceberg.avro.codecs import AvroCompressionCodec
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import DataFile, write_manifest
Expand Down Expand Up @@ -77,7 +78,8 @@ def table_test_all_types(catalog: Catalog) -> Table:


@pytest.mark.integration
def test_write_sample_manifest(table_test_all_types: Table) -> None:
@pytest.mark.parametrize("compression", ["null", "deflate"])
def test_write_sample_manifest(table_test_all_types: Table, compression: AvroCompressionCodec) -> None:
test_snapshot = table_test_all_types.current_snapshot()
if test_snapshot is None:
raise ValueError("Table has no current snapshot, check the docker environment")
Expand Down Expand Up @@ -120,6 +122,7 @@ def test_write_sample_manifest(table_test_all_types: Table) -> None:
schema=test_schema,
output_file=output,
snapshot_id=test_snapshot.snapshot_id,
avro_compression=compression,
) as manifest_writer:
# For simplicity, try one entry first
manifest_writer.add_entry(test_manifest_entries[0])
Expand Down
Loading