Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/iceberg-python into fd-docum…
Browse files Browse the repository at this point in the history
…ent-write-options
  • Loading branch information
Fokko committed Feb 5, 2024
2 parents 7f582cb + 40ab60a commit ca1ca76
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 96 deletions.
35 changes: 32 additions & 3 deletions mkdocs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ You either need to install `s3fs`, `adlfs`, `gcs`, or `pyarrow` to be able to fe

Iceberg leverages the [catalog to have one centralized place to organize the tables](https://iceberg.apache.org/catalog/). This can be a traditional Hive catalog to store your Iceberg tables next to the rest, a vendor solution like the AWS Glue catalog, or an implementation of Icebergs' own [REST protocol](https://github.com/apache/iceberg/tree/main/open-api). Checkout the [configuration](configuration.md) page to find all the configuration details.

For the sake of demonstration, we'll configure the catalog to use the `SqlCatalog` implementation, which will store information in a local `sqlite` database. We'll also configure the catalog to store data files in the local filesystem instead of an object store. This should not be used in production due to the limited scalability.

Create a temporary location for Iceberg:

```shell
mkdir /tmp/warehouse
```

Open a Python 3 REPL to set up the catalog:

```python
from pyiceberg.catalog.sql import SqlCatalog

warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
```

## Write a PyArrow dataframe

Let's take the Taxi dataset, and write this to an Iceberg table.
Expand All @@ -83,9 +106,7 @@ df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")
Create a new Iceberg table:

```python
from pyiceberg.catalog import load_catalog

catalog = load_catalog("default")
catalog.create_namespace("default")

table = catalog.create_table(
"default.taxi_dataset",
Expand Down Expand Up @@ -158,6 +179,14 @@ df = table.scan(row_filter="tip_per_mile > 0").to_arrow()
len(df)
```

### Explore Iceberg data and metadata files

Since the catalog was configured to use the local filesystem, we can explore how Iceberg saved data and metadata files from the above operations.

```shell
find /tmp/warehouse/
```

## More details

For the details, please check the [CLI](cli.md) or [Python API](api.md) page.
4 changes: 3 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
Expand Down Expand Up @@ -125,6 +126,7 @@ def _construct_parameters(
StringType: "string",
UUIDType: "string",
TimestampType: "timestamp",
TimestamptzType: "timestamp",
FixedType: "binary",
BinaryType: "binary",
}
Expand All @@ -150,7 +152,7 @@ def primitive(self, primitive: PrimitiveType) -> str:
if isinstance(primitive, DecimalType):
return f"decimal({primitive.precision},{primitive.scale})"
if (primitive_type := type(primitive)) not in GLUE_PRIMITIVE_TYPES:
raise ValueError(f"Unknown primitive type: {primitive}")
return str(primitive_type.root)
return GLUE_PRIMITIVE_TYPES[primitive_type]


Expand Down
10 changes: 7 additions & 3 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
union,
update,
)
from sqlalchemy.exc import IntegrityError, NoResultFound, OperationalError
from sqlalchemy.exc import IntegrityError, NoResultFound, OperationalError, ProgrammingError
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
Expand Down Expand Up @@ -101,7 +101,8 @@ def __init__(self, name: str, **properties: str):

if not (uri_prop := self.properties.get("uri")):
raise NoSuchPropertyException("SQL connection URI is required")
self.engine = create_engine(uri_prop, echo=True)
echo = bool(self.properties.get("echo", False))
self.engine = create_engine(uri_prop, echo=echo)

self._ensure_tables_exist()

Expand All @@ -111,7 +112,10 @@ def _ensure_tables_exist(self) -> None:
stmt = select(1).select_from(table)
try:
session.scalar(stmt)
except OperationalError:
except (
OperationalError,
ProgrammingError,
): # sqlalchemy returns OperationalError in case of sqlite and ProgrammingError with postgres.
self.create_tables()
return

Expand Down
26 changes: 11 additions & 15 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,13 +943,13 @@ def append(self, df: pa.Table) -> None:
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

if len(self.sort_order().fields) > 0:
raise ValueError("Cannot write to tables with a sort-order")

data_files = _dataframe_to_data_files(self, df=df)
merge = _MergingSnapshotProducer(operation=Operation.APPEND, table=self)
for data_file in data_files:
merge.append_data_file(data_file)

# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
for data_file in data_files:
merge.append_data_file(data_file)

merge.commit()

Expand All @@ -976,17 +976,16 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

if len(self.sort_order().fields) > 0:
raise ValueError("Cannot write to tables with a sort-order")

data_files = _dataframe_to_data_files(self, df=df)
merge = _MergingSnapshotProducer(
operation=Operation.OVERWRITE if self.current_snapshot() is not None else Operation.APPEND,
table=self,
)

for data_file in data_files:
merge.append_data_file(data_file)
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(self, df=df)
for data_file in data_files:
merge.append_data_file(data_file)

merge.commit()

Expand Down Expand Up @@ -2279,9 +2278,6 @@ def _dataframe_to_data_files(table: Table, df: pa.Table) -> Iterable[DataFile]:
if len(table.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")

if len(table.sort_order().fields) > 0:
raise ValueError("Cannot write to tables with a sort-order")

write_uuid = uuid.uuid4()
counter = itertools.count(0)

Expand Down
6 changes: 5 additions & 1 deletion tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ def test_ray_all_types(catalog: Catalog) -> None:
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')])
def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None:
table_test_all_types = catalog.load_table("default.test_all_types")
fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password")
fs = S3FileSystem(
endpoint_override=catalog.properties["s3.endpoint"],
access_key=catalog.properties["s3.access-key-id"],
secret_key=catalog.properties["s3.secret-access-key"],
)
data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()]
for data_file_path in data_file_paths:
uri = urlparse(data_file_path)
Expand Down
Loading

0 comments on commit ca1ca76

Please sign in to comment.