Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] reimplement Snapshot manifest cache #1185

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import time
from collections import defaultdict
from enum import Enum
from functools import lru_cache
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer
Expand Down Expand Up @@ -231,13 +230,6 @@ def __eq__(self, other: Any) -> bool:
)


@lru_cache
def _manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
"""Return the manifests from the manifest list."""
file = io.new_input(manifest_list)
return list(read_manifest_list(file))


class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None)
Expand All @@ -249,6 +241,9 @@ class Snapshot(IcebergBaseModel):
summary: Optional[Summary] = Field(default=None)
schema_id: Optional[int] = Field(alias="schema-id", default=None)

# Private attribute for caching the manifests
_manifest_cache: Optional[List[ManifestFile]] = PrivateAttr(default=None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks promising @kevinjqliu

I know this is a regression from the intended optimization at the module level as suggested here: #787 (comment) but I don't think we have a better alternative 🙂


def __str__(self) -> str:
"""Return the string representation of the Snapshot class."""
operation = f"{self.summary.operation}: " if self.summary else ""
Expand All @@ -258,10 +253,14 @@ def __str__(self) -> str:
return result_str

def manifests(self, io: FileIO) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
if self.manifest_list:
return _manifests(io, self.manifest_list)
return []
"""Return the manifests for the given snapshot, using instance-level caching."""
if self._manifest_cache is None:
if self.manifest_list:
file = io.new_input(self.manifest_list)
self._manifest_cache = list(read_manifest_list(file))
else:
self._manifest_cache = []
return self._manifest_cache


class MetadataLogEntry(IcebergBaseModel):
Expand Down
26 changes: 26 additions & 0 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
from typing import Dict
from unittest.mock import patch

import fastavro
import pytest
Expand Down Expand Up @@ -306,6 +307,31 @@ def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
assert entry.status == ManifestEntryStatus.ADDED


def test_read_manifest_cache(generated_manifest_file_file_v2: str) -> None:
with patch("pyiceberg.table.snapshots.read_manifest_list") as mocked_read_manifest_list:
# Mock the read_manifest_list function relative to the module path
io = load_file_io()

snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
manifest_list=generated_manifest_file_file_v2,
summary=Summary(Operation.APPEND),
schema_id=3,
)

# Access the manifests property multiple times to test caching
manifests_first_call = snapshot.manifests(io)
manifests_second_call = snapshot.manifests(io)

# Ensure that read_manifest_list was called only once
mocked_read_manifest_list.assert_called_once()

# Ensure that the same manifest list is returned
assert manifests_first_call == manifests_second_call


def test_write_empty_manifest() -> None:
io = load_file_io()
test_schema = Schema(NestedField(1, "foo", IntegerType(), False))
Expand Down