Skip to content

Commit bf68848

Browse files
committed
add test
1 parent d705c76 commit bf68848

File tree

2 files changed

+59
-28
lines changed

2 files changed

+59
-28
lines changed

pyiceberg/manifest.py

+1
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,7 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter:
865865
entry.snapshot_id, entry.sequence_number, entry.file_sequence_number, entry.data_file
866866
)
867867
)
868+
self._existing_files += 1
868869
return self
869870

870871

tests/integration/test_writes/test_rewrite_manifests.py

+58-28
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
# pylint:disable=redefined-outer-name
18+
from typing import List
1819

1920
import pyarrow as pa
2021
import pytest
2122

2223
from pyiceberg.catalog import Catalog
24+
from pyiceberg.manifest import ManifestFile
25+
from pyiceberg.table import TableProperties
2326
from utils import _create_table
2427

2528

@@ -171,49 +174,76 @@ def test_rewrite_small_manifests_non_partitioned_table(session_catalog: Catalog,
171174
assert expected_records_count == actual_records_count, "Rows must match"
172175

173176

174-
def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
175-
# Create and append data files
176-
# records1 = [ThreeColumnRecord(1, None, "AAAA"), ThreeColumnRecord(1, "BBBBBBBBBB", "BBBB")]
177-
# records2 = [ThreeColumnRecord(2, "CCCCCCCCCC", "CCCC"), ThreeColumnRecord(2, "DDDDDDDDDD", "DDDD")]
178-
# records3 = [ThreeColumnRecord(3, "EEEEEEEEEE", "EEEE"), ThreeColumnRecord(3, "FFFFFFFFFF", "FFFF")]
179-
# records4 = [ThreeColumnRecord(4, "GGGGGGGGGG", "GGGG"), ThreeColumnRecord(4, "HHHHHHHHHH", "HHHH")]
180-
# self.table.newFastAppend().appendFile(DataFile.from_records(records1)).commit()
181-
# self.table.newFastAppend().appendFile(DataFile.from_records(records2)).commit()
182-
# self.table.newFastAppend().appendFile(DataFile.from_records(records3)).commit()
183-
# self.table.newFastAppend().appendFile(DataFile.from_records(records4)).commit()
177+
def compute_manifest_entry_size_bytes(manifests: List[ManifestFile]) -> float:
178+
total_size = 0
179+
num_entries = 0
180+
181+
for manifest in manifests:
182+
total_size += manifest.manifest_length
183+
num_entries += manifest.added_files_count + manifest.existing_files_count + manifest.deleted_files_count
184+
185+
return total_size / num_entries if num_entries > 0 else 0
186+
187+
188+
def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog) -> None:
189+
records1 = pa.Table.from_pydict({"c1": [1, 1], "c2": [None, "BBBBBBBBBB"], "c3": ["AAAA", "BBBB"]})
190+
191+
records2 = records2 = pa.Table.from_pydict({"c1": [2, 2], "c2": ["CCCCCCCCCC", "DDDDDDDDDD"], "c3": ["CCCC", "DDDD"]})
192+
193+
records3 = records3 = pa.Table.from_pydict({"c1": [3, 3], "c2": ["EEEEEEEEEE", "FFFFFFFFFF"], "c3": ["EEEE", "FFFF"]})
194+
195+
records4 = records4 = pa.Table.from_pydict({"c1": [4, 4], "c2": ["GGGGGGGGGG", "HHHHHHHHHG"], "c3": ["GGGG", "HHHH"]})
196+
197+
schema = pa.schema(
198+
[
199+
("c1", pa.int64()),
200+
("c2", pa.string()),
201+
("c3", pa.string()),
202+
]
203+
)
184204

185205
identifier = "default.test_rewrite_small_manifests_non_partitioned_table"
186-
tbl = _create_table(session_catalog, identifier, {"format-version": "2"})
187-
tbl.append(arrow_table_with_null)
188-
tbl.append(arrow_table_with_null)
189-
tbl.append(arrow_table_with_null)
190-
tbl.append(arrow_table_with_null)
206+
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema)
207+
208+
tbl.append(records1)
209+
tbl.append(records2)
210+
tbl.append(records3)
211+
tbl.append(records4)
212+
tbl.refresh()
191213

192214
tbl.refresh()
193215
manifests = tbl.current_snapshot().manifests(tbl.io)
194216
assert len(manifests) == 4, "Should have 4 manifests before rewrite"
195217

196-
# Perform the rewrite manifests action
197-
# actions = SparkActions.get()
218+
# manifest_entry_size_bytes = compute_manifest_entry_size_bytes(manifests)
219+
target_manifest_size_bytes = 5200 * 2 + 100
220+
tbl = (
221+
tbl.transaction()
222+
.set_properties({TableProperties.MANIFEST_TARGET_SIZE_BYTES: str(target_manifest_size_bytes)})
223+
.commit_transaction()
224+
)
225+
198226
result = tbl.rewrite_manifests()
199227

228+
tbl.refresh()
200229
assert len(result.rewritten_manifests) == 4, "Action should rewrite 4 manifests"
201230
assert len(result.added_manifests) == 2, "Action should add 2 manifests"
202231

203-
tbl.refresh()
204232
new_manifests = tbl.current_snapshot().manifests(tbl.io)
205233
assert len(new_manifests) == 2, "Should have 2 manifests after rewrite"
206234

207235
assert new_manifests[0].existing_files_count == 4
208236
assert new_manifests[0].added_files_count == 0
209237
assert new_manifests[0].deleted_files_count == 0
210-
#
211-
# assertnew_manifests[1].existingFilesCount(), 4)
212-
# self.assertFalse(new_manifests[1].hasAddedFiles())
213-
# self.assertFalse(new_manifests[1].hasDeletedFiles())
214-
#
215-
# # Validate the records
216-
# expected_records = records1 + records2 + records3 + records4
217-
# result_df = tbl.read()
218-
# actual_records = result_df.collect()
219-
# self.assertEqual(actual_records, expected_records, "Rows must match")
238+
239+
assert new_manifests[1].existing_files_count == 4
240+
assert new_manifests[1].added_files_count == 0
241+
assert new_manifests[1].deleted_files_count == 0
242+
243+
sorted_df = tbl.scan().to_pandas().sort_values(["c1", "c2"], ascending=[False, False])
244+
expectedRecords = (
245+
pa.concat_tables([records1, records2, records3, records4]).to_pandas().sort_values(["c1", "c2"], ascending=[False, False])
246+
)
247+
from pandas.testing import assert_frame_equal
248+
249+
assert_frame_equal(sorted_df.reset_index(drop=True), expectedRecords.reset_index(drop=True))

0 commit comments

Comments
 (0)