diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index d84c82ec2a..171f52a531 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -24,7 +24,7 @@ hide: # Python API -PyIceberg is based around catalogs to load tables. First step is to instantiate a catalog that loads tables. Let's use the following configuration to define a catalog called `prod`: +(Py)Iceberg is [catalog](https://iceberg.apache.org/terms/#catalog) centric. Meaning that reading/writing data goes via a catalog. First step is to instantiate a catalog to load a table. Let's use the following configuration in `.pyiceberg.yaml` to define a REST catalog called `prod`: ```yaml catalog: @@ -33,7 +33,7 @@ catalog: credential: t-1234:secret ``` -Note that multiple catalogs can be defined in the same `.pyiceberg.yaml`: +Note that multiple catalogs can be defined in the same `.pyiceberg.yaml`, for example, in the case of a Hive and REST catalog: ```yaml catalog: @@ -47,13 +47,11 @@ catalog: warehouse: my-warehouse ``` -and loaded in python by calling `load_catalog(name="hive")` and `load_catalog(name="rest")`. +The different catalogs can be loaded in PyIceberg by their name: `load_catalog(name="hive")` and `load_catalog(name="rest")`. An overview of the configuration options can be found on the [configuration page](https://py.iceberg.apache.org/configuration/). This information must be placed inside a file called `.pyiceberg.yaml` located either in the `$HOME` or `%USERPROFILE%` directory (depending on whether the operating system is Unix-based or Windows-based, respectively), in the current working directory, or in the `$PYICEBERG_HOME` directory (if the corresponding environment variable is set). -For more details on possible configurations refer to the [specific page](https://py.iceberg.apache.org/configuration/). - -Then load the `prod` catalog: +It is also possible to load a catalog without using a `.pyiceberg.yaml` by passing in the properties directly: ```python from pyiceberg.catalog import load_catalog @@ -70,13 +68,13 @@ catalog = load_catalog( ) ``` -Let's create a namespace: +Next, create a namespace: ```python catalog.create_namespace("docs_example") ``` -And then list them: +Or, list existing namespaces: ```python ns = catalog.list_namespaces() @@ -84,12 +82,6 @@ ns = catalog.list_namespaces() assert ns == [("docs_example",)] ``` -And then list tables in the namespace: - -```python -catalog.list_tables("docs_example") -``` - ## Create a table To create a table from a catalog: @@ -123,24 +115,21 @@ schema = Schema( ) from pyiceberg.partitioning import PartitionSpec, PartitionField -from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( - source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" + source_id=1, field_id=1000, transform="day", name="datetime_day" ) ) from pyiceberg.table.sorting import SortOrder, SortField -from pyiceberg.transforms import IdentityTransform # Sort on the symbol -sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform())) +sort_order = SortOrder(SortField(source_id=2, transform='identity')) catalog.create_table( identifier="docs_example.bids", schema=schema, - location="s3://pyiceberg", partition_spec=partition_spec, sort_order=sort_order, ) @@ -153,13 +142,11 @@ To create a table using a pyarrow schema: ```python import pyarrow as pa -schema = pa.schema( - [ +schema = pa.schema([ pa.field("foo", pa.string(), nullable=True), pa.field("bar", pa.int32(), nullable=False), pa.field("baz", pa.bool_(), nullable=True), - ] -) +]) catalog.create_table( identifier="docs_example.bids", @@ -167,18 +154,12 @@ catalog.create_table( ) ``` -To create a table with some subsequent changes atomically in a transaction: +Another API to create a table is using the `create_table_transaction`. This follows the same APIs when making updates to a table. This is a friendly API for both setting the partition specification and sort-order, because you don't have to deal with field-IDs. ```python -with catalog.create_table_transaction( - identifier="docs_example.bids", - schema=schema, - location="s3://pyiceberg", - partition_spec=partition_spec, - sort_order=sort_order, -) as txn: +with catalog.create_table_transaction(identifier="docs_example.bids", schema=schema) as txn: with txn.update_schema() as update_schema: - update_schema.add_column(path="new_column", field_type=StringType()) + update_schema.add_column(path="new_column", field_type='string') with txn.update_spec() as update_spec: update_spec.add_identity("symbol") @@ -188,6 +169,8 @@ with catalog.create_table_transaction( ## Load a table +There are two ways of reading an Iceberg table; through a catalog, and by pointing at the Iceberg metadata directly. Reading through a catalog is preferred, and directly pointing at the metadata is read-only. + ### Catalog table Loading the `bids` table: @@ -203,7 +186,7 @@ This returns a `Table` that represents an Iceberg table that can be queried and ### Static table -To load a table directly from a metadata file (i.e., **without** using a catalog), you can use a `StaticTable` as follows: +To load a table directly from a `metadata.json` file (i.e., **without** using a catalog), you can use a `StaticTable` as follows: ```python from pyiceberg.table import StaticTable @@ -213,16 +196,13 @@ static_table = StaticTable.from_metadata( ) ``` -The static-table is considered read-only. - -Alternatively, if your table metadata directory contains a `version-hint.text` file, you can just specify -the table root path, and the latest metadata file will be picked automatically. +The static-table does not allow for write operations. If your table metadata directory contains a `version-hint.text` file, you can just specify the table root path, and the latest `metadata.json` file will be resolved automatically: ```python from pyiceberg.table import StaticTable static_table = StaticTable.from_metadata( - "s3://warehouse/wh/nyc.db/taxis + "s3://warehouse/wh/nyc.db/taxis" ) ``` @@ -236,9 +216,9 @@ catalog.table_exists("docs_example.bids") Returns `True` if the table already exists. -## Write support +## Write to a table -With PyIceberg 0.6.0 write support is added through Arrow. Let's consider an Arrow Table: +Reading and writing is being done using [Apache Arrow](https://arrow.apache.org/). Arrow is an in-memory columnar format for fast data interchange and in-memory analytics. Let's consider the following Arrow Table: ```python import pyarrow as pa @@ -253,31 +233,22 @@ df = pa.Table.from_pylist( ) ``` -Next, create a table based on the schema: +Next, create a table using the Arrow schema: ```python from pyiceberg.catalog import load_catalog catalog = load_catalog("default") -from pyiceberg.schema import Schema -from pyiceberg.types import NestedField, StringType, DoubleType - -schema = Schema( - NestedField(1, "city", StringType(), required=False), - NestedField(2, "lat", DoubleType(), required=False), - NestedField(3, "long", DoubleType(), required=False), -) - -tbl = catalog.create_table("default.cities", schema=schema) +tbl = catalog.create_table("default.cities", schema=df.schema) ``` -Now write the data to the table: +Next, write the data to the table. Both `append` and `overwrite` produce the same result, since the table is empty on creation: !!! note inline end "Fast append" - PyIceberg default to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables quick writes, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a normal commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads. + PyIceberg defaults to the [fast append](https://iceberg.apache.org/spec/#snapshots) to minimize the amount of data written. This enables fast commit operations, reducing the possibility of conflicts. The downside of the fast append is that it creates more metadata than a merge commit. [Compaction is planned](https://github.com/apache/iceberg-python/issues/270) and will automatically rewrite all the metadata when a threshold is hit, to maintain performant reads. @@ -289,7 +260,7 @@ tbl.append(df) tbl.overwrite(df) ``` -The data is written to the table, and when the table is read using `tbl.scan().to_arrow()`: +Now, the data is written to the table, and the table can be read using `tbl.scan().to_arrow()`: ```python pyarrow.Table @@ -302,14 +273,12 @@ lat: [[52.371807,37.773972,53.11254,48.864716]] long: [[4.896029,-122.431297,6.0989,2.349014]] ``` -You both can use `append(df)` or `overwrite(df)` since there is no data yet. If we want to add more data, we can use `.append()` again: +If we want to add more data, we can use `.append()` again: ```python -df = pa.Table.from_pylist( +tbl.append(pa.Table.from_pylist( [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], -) - -tbl.append(df) +)) ``` When reading the table `tbl.scan().to_arrow()` you can see that `Groningen` is now also part of the table: @@ -325,33 +294,30 @@ lat: [[52.371807,37.773972,53.11254,48.864716],[53.21917]] long: [[4.896029,-122.431297,6.0989,2.349014],[6.56667]] ``` -The nested lists indicate the different Arrow buffers, where the first write results into a buffer, and the second append in a separate buffer. This is expected since it will read two parquet files. - -To avoid any type errors during writing, you can enforce the PyArrow table types using the Iceberg table schema: +The nested lists indicate the different Arrow buffers. Each of the writes produce a [Parquet file](https://parquet.apache.org/) where each [row group](https://parquet.apache.org/docs/concepts/) translates into an Arrow buffer. In the case where the table is large, PyIceberg also allows the option to stream the buffers using the Arrow [RecordBatchReader](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html), avoiding pulling everything into memory right away: ```python -from pyiceberg.catalog import load_catalog -import pyarrow as pa +for buf in tbl.scan().to_arrow_batch_reader(): + print(f"Buffer contains {len(buf)} rows") +``` -catalog = load_catalog("default") -table = catalog.load_table("default.cities") -schema = table.schema().as_arrow() +To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow: +```python df = pa.Table.from_pylist( - [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=schema + [{"city": "Groningen", "lat": 53.21917, "long": 6.56667}], schema=table.schema().as_arrow() ) -table.append(df) +tbl.append(df) ``` -You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. +You can delete some of the data from the table by calling `tbl.delete()` with a desired `delete_filter`. This will use the Iceberg metadata to only open up the Parquet files that contain relevant information. ```python tbl.delete(delete_filter="city == 'Paris'") ``` -In the above example, any records where the city field value equals to `Paris` will be deleted. -Running `tbl.scan().to_arrow()` will now yield: +In the above example, any records where the city field value equals to `Paris` will be deleted. Running `tbl.scan().to_arrow()` will now yield: ```python pyarrow.Table @@ -364,30 +330,11 @@ lat: [[52.371807,37.773972,53.11254],[53.21917]] long: [[4.896029,-122.431297,6.0989],[6.56667]] ``` -### Partial overwrites - -When using the `overwrite` API, you can use an `overwrite_filter` to delete data that matches the filter before appending new data into the table. - -For example, with an iceberg table created as: - -```python -from pyiceberg.catalog import load_catalog - -catalog = load_catalog("default") - -from pyiceberg.schema import Schema -from pyiceberg.types import NestedField, StringType, DoubleType +In the case of `tbl.delete(delete_filter="city == 'Groningen'")`, the whole Parquet file will be dropped without checking it contents, since from the Iceberg metadata PyIceberg can derive that all the content in the file matches the predicate. -schema = Schema( - NestedField(1, "city", StringType(), required=False), - NestedField(2, "lat", DoubleType(), required=False), - NestedField(3, "long", DoubleType(), required=False), -) - -tbl = catalog.create_table("default.cities", schema=schema) -``` +### Partial overwrites -And with initial data populating the table: +When using the `overwrite` API, you can use an `overwrite_filter` to delete data that matches the filter before appending new data into the table. For example, consider the following Iceberg table: ```python import pyarrow as pa @@ -399,6 +346,12 @@ df = pa.Table.from_pylist( {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) + +from pyiceberg.catalog import load_catalog +catalog = load_catalog("default") + +tbl = catalog.create_table("default.cities", schema=df.schema) + tbl.append(df) ```