Skip to content

Write support #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 71 commits into from
Jan 18, 2024
Merged

Write support #41

merged 71 commits into from
Jan 18, 2024

Conversation

Fokko
Copy link
Contributor

@Fokko Fokko commented Oct 4, 2023

Experimental branch to implement writing. Much of the changes here will be split out into small manageable PRs.

Resolves #181
Resolves #23

Fokko added 8 commits October 3, 2023 13:10
For V1 and V2 there are some differences that are hard
to enforce without this:

- `1: snapshot_id` is required for V1, optional for V2
- `105: block_size_in_bytes` needs to be written for V1, but omitted for V2 (this leverages the `write-default`).
- `3: sequence_number` and `4: file_sequence_number` can be omited for V1.

Everything that we read, we map it to V2. However, when writing
we also want to be compliant with the V1 spec, and this is where
the writer tree comes in since we construct a tree for V1 or V2.
@Fokko Fokko added this to the PyIceberg 0.6.0 release milestone Oct 9, 2023
@samplec0de
Copy link

Very relevant! I'm looking forward to it, thank you!

@Fokko Fokko mentioned this pull request Oct 11, 2023
4 tasks

When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table:

```
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on this, I also checked the field-ids:

parq 00000-0-27345354-67b8-4861-95ca-c2de9dc8d3fe.parquet --schema

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x11eca2e00>
required group field_id=-1 schema {
  optional binary field_id=1 city (String);
  optional double field_id=2 lat;
  optional double field_id=3 long;
}

schema = Schema(
NestedField(1, "city", StringType(), required=False),
NestedField(2, "lat", DoubleType(), required=False),
NestedField(3, "long", DoubleType(), required=False),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't required=False the default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the default is the more strict True. I've set it to False because PyArrow produces nullable fields by default

if len(self.sort_order().fields) > 0:
raise ValueError("Cannot write to tables with a sort-order")

snapshot_id = self.new_snapshot_id()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this can be handled inside of _MergeAppend since it has the table.

snapshot_id = self.new_snapshot_id()

data_files = _dataframe_to_data_files(self, df=df)
merge = _MergeAppend(operation=Operation.APPEND, table=self, snapshot_id=snapshot_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a "merge append" if the operation may be overwrite? You might consider using _MergingCommit or _MergingSnapshotProducer (if you want to follow the Java convention).

for entry in manifest.fetch_manifest_entry(self._table.io, discard_deleted=True)
]

list_of_entries = executor.map(_get_entries, previous_snapshot.manifests(self._table.io))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be a good idea to defensively use only data manifests here instead of all manifests.

status=ManifestEntryStatus.DELETED,
snapshot_id=entry.snapshot_id,
data_sequence_number=entry.data_sequence_number,
file_sequence_number=entry.file_sequence_number,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

raise ValueError(f"Not implemented for: {self._operation}")

def _manifests(self) -> List[ManifestFile]:
manifests = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Since this is empty, it looks like this is just the newly created manifests. It may be a good idea to name this new_manifests.

summary=Summary(operation=self._operation, **self._summary()),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this block could be moved to _summary so that it produces the correct summary without the need to modify it afterward. That seems a bit cleaner to me, rather than having a two-step process split across methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a great point 👍

if self._operation == Operation.APPEND and previous_snapshot is not None:
# In case we want to append, just add the existing manifests
writer.add_manifests(previous_snapshot.manifests(io=self._table.io))
writer.add_manifests(new_manifests)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the note above, I think it would be cleaner to have _manifests produce the complete set of manifests, not just the replacement ones. That method already relies on _deleted_entries to produce deletes, so it may as well also be responsible for checking whether to include the existing manifests.

Another option is to make _manifests produce just manifests for the appended files and handle deletes separately, but it looks like your approach here is to create just one manifest with both deletes and appends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion. I've moved all the logic to _manifests()

)

for delete_entry in deleted_entries:
writer.add_entry(delete_entry)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach works fine, but I want to point out that there are drawbacks to writing the deletes in the same manifest:

  1. A reader has to load all of the deletes, even though the files aren't useful. If they are in a separate manifest, readers can filter out manifests that have no EXISTING or ADDED data files.
  2. Manifests with no data files can be removed in future append commits.
  3. This write is single-threaded. In the Java implementation, we produce a manifest of deleted data files for each existing manifest. That allows us to parallelize the operation.

Here's the logic we use to drop manifests that aren't needed on the Java side when producing the new list of manifests:

    // only keep manifests that have live data files or that were written by this commit
    Predicate<ManifestFile> shouldKeep =
        manifest ->
            manifest.hasAddedFiles()
                || manifest.hasExistingFiles()
                || manifest.snapshotId() == snapshotId();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this one, thanks for suggesting it! 👍 I've split out the ADDED, EXISTING, and DELETE entries into separate manifests that write in parallel.

@@ -175,6 +175,104 @@ static_table = StaticTable.from_metadata(

The static-table is considered read-only.

## Write support

With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this example! Made it really easy to test out.

The example works great cut & pasted into a REPL. I also tested modifications to the dataframe schema passed to append and it does the right thing. I get a schema error for a few cases:

  • Missing column long
  • Type mismatch string instead of double
  • Extra column country

Looks like Arrow requires that the schema matches, which is great.

It would be nice to allow some type promotion in the future. I'm not sure whether arrow would automatically write floats into double columns, for example. I would also like to make sure we have better error messages, not just "ValueError: Table schema does not match schema used to create file: ...". Those will be good follow ups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this ties into the work that @syun64 is doing where we have to make sure that we map the fields correctly, and then I think we can add options to massage the Arrow schema into the Iceberg one (which should be leading).

We can create a visitorWithPartner that will see if the promotions are possible. One that comes to my mind directly, is checking if there are any nulls. Arrow marks the schemas as nullable by default, while there are no nulls.

@rdblue
Copy link
Contributor

rdblue commented Jan 17, 2024

@Fokko, this works great and I don't see any blockers so I've approved it.

I think there are a few things to consider in terms of how we want to do this moving forward (whether to use separate manifests for example) but we can get this in and iterate from there. It also looks like this is pretty close to being able to run the overwrite filter, too! Great work.

@Fokko Fokko merged commit 8f7927b into apache:main Jan 18, 2024
@Fokko Fokko deleted the fd-write branch January 18, 2024 10:19
@Fokko
Copy link
Contributor Author

Fokko commented Jan 18, 2024

Many thanks again for the great review @rdblue. I went forward and merged it 🙌 Probably we'll improve a bit more on the code-style structure when we add #270 We went a bit back and forth a couple of times. Great having this in 🚀

}

TABLE_SCHEMA = Schema(
NestedField(field_id=1, name="bool", field_type=BooleanType(), required=False),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko I have tested the write I see if we make Table schema for any field required =True example NestedField(field_id=13, name="fixed", field_type=FixedType(16), required=True)

ValueError: Table schema does not match schema used to create file:

I always fails 1102 if not table.schema.equals(self.schema, check_metadata=False):
1103 msg = ('Table schema does not match schema used to create file: '
1104 '\ntable:\n{!s} vs. \nfile:\n{!s}'
1105 .format(table.schema, self.schema))
-> 1106 raise ValueError(msg)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyArrow fields by default are nullable, which matches all the nested fields in TABLE_SCHEMA. If you want to test against non-nullable fields, then arrow_table_with_null or whatever other pyarrow table you are instantiating should have nullable=False for the field that has required=True.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sebpretzer thanks for the clarification. Tested it work as expected

@mkleinbort-ic
Copy link

Is there an ETA for write functionality in the released version?

@EternalDeiwos
Copy link
Contributor

Check the attached milestone for progress. When those issues are resolved it will be ready for release.

@sungwy
Copy link
Collaborator

sungwy commented Jan 31, 2024

Hi @mkleinbort-ic we've just started voting on the first release candidate that incorporates this change

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Uploading Data to Iceberg Python write support