|
31 | 31 | ManifestEntryStatus,
|
32 | 32 | ManifestFile,
|
33 | 33 | PartitionFieldSummary,
|
| 34 | + RollingManifestWriter, |
34 | 35 | read_manifest_list,
|
35 | 36 | write_manifest,
|
36 | 37 | write_manifest_list,
|
@@ -476,6 +477,184 @@ def test_write_manifest(
|
476 | 477 | assert data_file.sort_order_id == 0
|
477 | 478 |
|
478 | 479 |
|
| 480 | +@pytest.mark.parametrize("format_version", [1, 2]) |
| 481 | +def test_rolling_manifest_writer( |
| 482 | + generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion |
| 483 | +) -> None: |
| 484 | + io = load_file_io() |
| 485 | + snapshot = Snapshot( |
| 486 | + snapshot_id=25, |
| 487 | + parent_snapshot_id=19, |
| 488 | + timestamp_ms=1602638573590, |
| 489 | + manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2, |
| 490 | + summary=Summary(Operation.APPEND), |
| 491 | + schema_id=3, |
| 492 | + ) |
| 493 | + demo_manifest_file = snapshot.manifests(io)[0] |
| 494 | + manifest_entries = demo_manifest_file.fetch_manifest_entry(io) |
| 495 | + test_schema = Schema( |
| 496 | + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False) |
| 497 | + ) |
| 498 | + test_spec = PartitionSpec( |
| 499 | + PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"), |
| 500 | + PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"), |
| 501 | + spec_id=demo_manifest_file.partition_spec_id, |
| 502 | + ) |
| 503 | + with TemporaryDirectory() as tmpdir: |
| 504 | + tmp_avro_file = tmpdir + "/test_write_manifest.avro" |
| 505 | + tmp_avro_file = tmpdir + "/test_write_manifest-1.avro" |
| 506 | + output = io.new_output(tmp_avro_file) |
| 507 | + def supplier(): |
| 508 | + i = 0 |
| 509 | + while True: |
| 510 | + i += 1 |
| 511 | + tmp_avro_file = tmpdir + f"/test_write_manifest-{i}.avro" |
| 512 | + output = io.new_output(tmp_avro_file) |
| 513 | + yield write_manifest( |
| 514 | + format_version=format_version, |
| 515 | + spec=test_spec, |
| 516 | + schema=test_schema, |
| 517 | + output_file=output, |
| 518 | + snapshot_id=8744736658442914487, |
| 519 | + ) |
| 520 | + with RollingManifestWriter(supplier=supplier(), target_file_size_in_bytes=388872 + 1, target_number_of_rows=20000) as writer: |
| 521 | + for entry in manifest_entries: |
| 522 | + writer.add_entry(entry) |
| 523 | + new_manifest = writer.to_manifest_files()[0] |
| 524 | + with pytest.raises(RuntimeError): |
| 525 | + # It is already closed |
| 526 | + writer.add_entry(manifest_entries[0]) |
| 527 | + |
| 528 | + expected_metadata = { |
| 529 | + "schema": test_schema.model_dump_json(), |
| 530 | + "partition-spec": test_spec.model_dump_json(), |
| 531 | + "partition-spec-id": str(test_spec.spec_id), |
| 532 | + "format-version": str(format_version), |
| 533 | + } |
| 534 | + _verify_metadata_with_fastavro( |
| 535 | + tmp_avro_file, |
| 536 | + expected_metadata, |
| 537 | + ) |
| 538 | + new_manifest_entries = new_manifest.fetch_manifest_entry(io) |
| 539 | + |
| 540 | + manifest_entry = new_manifest_entries[0] |
| 541 | + |
| 542 | + assert manifest_entry.status == ManifestEntryStatus.ADDED |
| 543 | + assert manifest_entry.snapshot_id == 8744736658442914487 |
| 544 | + assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3 |
| 545 | + assert isinstance(manifest_entry.data_file, DataFile) |
| 546 | + |
| 547 | + data_file = manifest_entry.data_file |
| 548 | + |
| 549 | + assert data_file.content is DataFileContent.DATA |
| 550 | + assert ( |
| 551 | + data_file.file_path |
| 552 | + == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" |
| 553 | + ) |
| 554 | + assert data_file.file_format == FileFormat.PARQUET |
| 555 | + assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) |
| 556 | + assert data_file.record_count == 19513 |
| 557 | + assert data_file.file_size_in_bytes == 388872 |
| 558 | + assert data_file.column_sizes == { |
| 559 | + 1: 53, |
| 560 | + 2: 98153, |
| 561 | + 3: 98693, |
| 562 | + 4: 53, |
| 563 | + 5: 53, |
| 564 | + 6: 53, |
| 565 | + 7: 17425, |
| 566 | + 8: 18528, |
| 567 | + 9: 53, |
| 568 | + 10: 44788, |
| 569 | + 11: 35571, |
| 570 | + 12: 53, |
| 571 | + 13: 1243, |
| 572 | + 14: 2355, |
| 573 | + 15: 12750, |
| 574 | + 16: 4029, |
| 575 | + 17: 110, |
| 576 | + 18: 47194, |
| 577 | + 19: 2948, |
| 578 | + } |
| 579 | + assert data_file.value_counts == { |
| 580 | + 1: 19513, |
| 581 | + 2: 19513, |
| 582 | + 3: 19513, |
| 583 | + 4: 19513, |
| 584 | + 5: 19513, |
| 585 | + 6: 19513, |
| 586 | + 7: 19513, |
| 587 | + 8: 19513, |
| 588 | + 9: 19513, |
| 589 | + 10: 19513, |
| 590 | + 11: 19513, |
| 591 | + 12: 19513, |
| 592 | + 13: 19513, |
| 593 | + 14: 19513, |
| 594 | + 15: 19513, |
| 595 | + 16: 19513, |
| 596 | + 17: 19513, |
| 597 | + 18: 19513, |
| 598 | + 19: 19513, |
| 599 | + } |
| 600 | + assert data_file.null_value_counts == { |
| 601 | + 1: 19513, |
| 602 | + 2: 0, |
| 603 | + 3: 0, |
| 604 | + 4: 19513, |
| 605 | + 5: 19513, |
| 606 | + 6: 19513, |
| 607 | + 7: 0, |
| 608 | + 8: 0, |
| 609 | + 9: 19513, |
| 610 | + 10: 0, |
| 611 | + 11: 0, |
| 612 | + 12: 19513, |
| 613 | + 13: 0, |
| 614 | + 14: 0, |
| 615 | + 15: 0, |
| 616 | + 16: 0, |
| 617 | + 17: 0, |
| 618 | + 18: 0, |
| 619 | + 19: 0, |
| 620 | + } |
| 621 | + assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0} |
| 622 | + assert data_file.lower_bounds == { |
| 623 | + 2: b"2020-04-01 00:00", |
| 624 | + 3: b"2020-04-01 00:12", |
| 625 | + 7: b"\x03\x00\x00\x00", |
| 626 | + 8: b"\x01\x00\x00\x00", |
| 627 | + 10: b"\xf6(\\\x8f\xc2\x05S\xc0", |
| 628 | + 11: b"\x00\x00\x00\x00\x00\x00\x00\x00", |
| 629 | + 13: b"\x00\x00\x00\x00\x00\x00\x00\x00", |
| 630 | + 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf", |
| 631 | + 15: b")\\\x8f\xc2\xf5(\x08\xc0", |
| 632 | + 16: b"\x00\x00\x00\x00\x00\x00\x00\x00", |
| 633 | + 17: b"\x00\x00\x00\x00\x00\x00\x00\x00", |
| 634 | + 18: b"\xf6(\\\x8f\xc2\xc5S\xc0", |
| 635 | + 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0", |
| 636 | + } |
| 637 | + assert data_file.upper_bounds == { |
| 638 | + 2: b"2020-04-30 23:5:", |
| 639 | + 3: b"2020-05-01 00:41", |
| 640 | + 7: b"\t\x01\x00\x00", |
| 641 | + 8: b"\t\x01\x00\x00", |
| 642 | + 10: b"\xcd\xcc\xcc\xcc\xcc,_@", |
| 643 | + 11: b"\x1f\x85\xebQ\\\xe2\xfe@", |
| 644 | + 13: b"\x00\x00\x00\x00\x00\x00\x12@", |
| 645 | + 14: b"\x00\x00\x00\x00\x00\x00\xe0?", |
| 646 | + 15: b"q=\n\xd7\xa3\xf01@", |
| 647 | + 16: b"\x00\x00\x00\x00\x00`B@", |
| 648 | + 17: b"333333\xd3?", |
| 649 | + 18: b"\x00\x00\x00\x00\x00\x18b@", |
| 650 | + 19: b"\x00\x00\x00\x00\x00\x00\x04@", |
| 651 | + } |
| 652 | + assert data_file.key_metadata is None |
| 653 | + assert data_file.split_offsets == [4] |
| 654 | + assert data_file.equality_ids is None |
| 655 | + assert data_file.sort_order_id == 0 |
| 656 | + |
| 657 | + |
479 | 658 | @pytest.mark.parametrize("format_version", [1, 2])
|
480 | 659 | def test_write_manifest_list(
|
481 | 660 | generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion
|
|
0 commit comments