Skip to content

Commit

Permalink
Merge branch 'apache:main' into python-3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
steinsgateted authored Dec 28, 2023
2 parents 926a8d2 + 088ee40 commit 7025018
Show file tree
Hide file tree
Showing 21 changed files with 2,250 additions and 827 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
if: startsWith(matrix.os, 'ubuntu')
run: ls -lah dist/* && cp dist/* wheelhouse/

- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
name: "release-${{ github.event.inputs.version }}"
path: ./wheelhouse/*
2 changes: 1 addition & 1 deletion .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
if: github.repository_owner == 'apache'
runs-on: ubuntu-22.04
steps:
- uses: actions/stale@v8.0.0
- uses: actions/stale@v9.0.0
with:
stale-issue-label: 'stale'
exempt-issue-labels: 'not-stale'
Expand Down
16 changes: 16 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ catalog:
credential: t-1234:secret
```
Note that multiple catalogs can be defined in the same `.pyiceberg.yaml`:

```yaml
catalog:
hive:
uri: thrift://127.0.0.1:9083
s3.endpoint: http://127.0.0.1:9000
s3.access-key-id: admin
s3.secret-access-key: password
rest:
uri: https://rest-server:8181/
warehouse: my-warehouse
```

and loaded in python by calling `load_catalog(name="hive")` and `load_catalog(name="rest")`.

This information must be placed inside a file called `.pyiceberg.yaml` located either in the `$HOME` or `%USERPROFILE%` directory (depending on whether the operating system is Unix-based or Windows-based, respectively) or in the `$PYICEBERG_HOME` directory (if the corresponding environment variable is set).

For more details on possible configurations refer to the [specific page](https://py.iceberg.apache.org/configuration/).
Expand Down
22 changes: 20 additions & 2 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ For the FileIO there are several configuration options available:
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.region | us-west-2 | Sets the region of the bucket |
| s3.proxy-uri | http://my.proxy.com:8080 | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |

### HDFS

Expand Down Expand Up @@ -140,8 +141,9 @@ catalog:

## SQL Catalog

The SQL catalog requires a database for its backend. As of now, pyiceberg only supports PostgreSQL through psycopg2.
The database connection has to be configured using the `uri` property (see SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls)):
The SQL catalog requires a database for its backend. PyIceberg supports PostgreSQL and SQLite through psycopg2. The database connection has to be configured using the `uri` property. See SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls):

For PostgreSQL:

```yaml
catalog:
Expand All @@ -150,6 +152,22 @@ catalog:
uri: postgresql+psycopg2://username:password@localhost/mydatabase
```

In the case of SQLite:

<!-- prettier-ignore-start -->

!!! warning inline end "Development only"
SQLite is not built for concurrency, you should use this catalog for exploratory or development purposes.

<!-- prettier-ignore-end -->

```yaml
catalog:
default:
type: sql
uri: sqlite:////tmp/pyiceberg.db
```

## Hive Catalog

```yaml
Expand Down
2 changes: 1 addition & 1 deletion mkdocs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ mkdocstrings-python==1.7.5
mkdocs-literate-nav==0.6.1
mkdocs-autorefs==0.5.0
mkdocs-gen-files==0.5.0
mkdocs-material==9.4.14
mkdocs-material==9.5.3
mkdocs-material-extensions==1.3.1
mkdocs-section-index==0.3.8
1,437 changes: 1,022 additions & 415 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def infer_catalog_type(name: str, catalog_properties: RecursiveDict) -> Optional
return CatalogType.REST
elif uri.startswith("thrift"):
return CatalogType.HIVE
elif uri.startswith("postgresql"):
elif uri.startswith(("sqlite", "postgresql")):
return CatalogType.SQL
else:
raise ValueError(f"Could not infer the catalog type from the uri: {uri}")
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class GenericDynamoDbError(DynamoDbError):
pass


class CommitFailedException(RESTError):
class CommitFailedException(Exception):
"""Commit failed, refresh and try again."""


Expand Down
1 change: 1 addition & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
S3_SESSION_TOKEN = "s3.session-token"
S3_REGION = "s3.region"
S3_PROXY_URI = "s3.proxy-uri"
S3_CONNECT_TIMEOUT = "s3.connect-timeout"
HDFS_HOST = "hdfs.host"
HDFS_PORT = "hdfs.port"
HDFS_USER = "hdfs.user"
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
GCS_TOKEN,
GCS_VERSION_AWARE,
S3_ACCESS_KEY_ID,
S3_CONNECT_TIMEOUT,
S3_ENDPOINT,
S3_PROXY_URI,
S3_REGION,
Expand Down Expand Up @@ -127,6 +128,9 @@ def _s3(properties: Properties) -> AbstractFileSystem:
if proxy_uri := properties.get(S3_PROXY_URI):
config_kwargs["proxies"] = {"http": proxy_uri, "https": proxy_uri}

if connect_timeout := properties.get(S3_CONNECT_TIMEOUT):
config_kwargs["connect_timeout"] = connect_timeout

fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)

for event_name, event_function in register_events.items():
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
HDFS_PORT,
HDFS_USER,
S3_ACCESS_KEY_ID,
S3_CONNECT_TIMEOUT,
S3_ENDPOINT,
S3_PROXY_URI,
S3_REGION,
Expand Down Expand Up @@ -330,6 +331,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
if proxy_uri := self.properties.get(S3_PROXY_URI):
client_kwargs["proxy_options"] = proxy_uri

if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
client_kwargs["connect_timeout"] = connect_timeout

return S3FileSystem(**client_kwargs)
elif scheme == "hdfs":
from pyarrow.fs import HadoopFileSystem
Expand Down
84 changes: 80 additions & 4 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from sortedcontainers import SortedList
from typing_extensions import Annotated

from pyiceberg.exceptions import ResolveError, ValidationError
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
And,
Expand Down Expand Up @@ -540,18 +540,40 @@ def update_table_metadata(base_metadata: TableMetadata, updates: Tuple[TableUpda
class TableRequirement(IcebergBaseModel):
type: str

@abstractmethod
def validate(self, base_metadata: Optional[TableMetadata]) -> None:
"""Validate the requirement against the base metadata.
Args:
base_metadata: The base metadata to be validated against.
Raises:
CommitFailedException: When the requirement is not met.
"""
...


class AssertCreate(TableRequirement):
"""The table must not already exist; used for create transactions."""

type: Literal["assert-create"] = Field(default="assert-create")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is not None:
raise CommitFailedException("Table already exists")


class AssertTableUUID(TableRequirement):
"""The table UUID must match the requirement's `uuid`."""

type: Literal["assert-table-uuid"] = Field(default="assert-table-uuid")
uuid: str
uuid: uuid.UUID

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.uuid != base_metadata.table_uuid:
raise CommitFailedException(f"Table UUID does not match: {self.uuid} != {base_metadata.table_uuid}")


class AssertRefSnapshotId(TableRequirement):
Expand All @@ -564,41 +586,95 @@ class AssertRefSnapshotId(TableRequirement):
ref: str
snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif snapshot_ref := base_metadata.refs.get(self.ref):
ref_type = snapshot_ref.snapshot_ref_type
if self.snapshot_id is None:
raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently")
elif self.snapshot_id != snapshot_ref.snapshot_id:
raise CommitFailedException(
f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}"
)
elif self.snapshot_id is not None:
raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")


class AssertLastAssignedFieldId(TableRequirement):
"""The table's last assigned column id must match the requirement's `last-assigned-field-id`."""

type: Literal["assert-last-assigned-field-id"] = Field(default="assert-last-assigned-field-id")
last_assigned_field_id: int = Field(..., alias="last-assigned-field-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif base_metadata.last_column_id != self.last_assigned_field_id:
raise CommitFailedException(
f"Requirement failed: last assigned field id has changed: expected {self.last_assigned_field_id}, found {base_metadata.last_column_id}"
)


class AssertCurrentSchemaId(TableRequirement):
"""The table's current schema id must match the requirement's `current-schema-id`."""

type: Literal["assert-current-schema-id"] = Field(default="assert-current-schema-id")
current_schema_id: int = Field(..., alias="current-schema-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.current_schema_id != base_metadata.current_schema_id:
raise CommitFailedException(
f"Requirement failed: current schema id has changed: expected {self.current_schema_id}, found {base_metadata.current_schema_id}"
)


class AssertLastAssignedPartitionId(TableRequirement):
"""The table's last assigned partition id must match the requirement's `last-assigned-partition-id`."""

type: Literal["assert-last-assigned-partition-id"] = Field(default="assert-last-assigned-partition-id")
last_assigned_partition_id: int = Field(..., alias="last-assigned-partition-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif base_metadata.last_partition_id != self.last_assigned_partition_id:
raise CommitFailedException(
f"Requirement failed: last assigned partition id has changed: expected {self.last_assigned_partition_id}, found {base_metadata.last_partition_id}"
)


class AssertDefaultSpecId(TableRequirement):
"""The table's default spec id must match the requirement's `default-spec-id`."""

type: Literal["assert-default-spec-id"] = Field(default="assert-default-spec-id")
default_spec_id: int = Field(..., alias="default-spec-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.default_spec_id != base_metadata.default_spec_id:
raise CommitFailedException(
f"Requirement failed: default spec id has changed: expected {self.default_spec_id}, found {base_metadata.default_spec_id}"
)


class AssertDefaultSortOrderId(TableRequirement):
"""The table's default sort order id must match the requirement's `default-sort-order-id`."""

type: Literal["assert-default-sort-order-id"] = Field(default="assert-default-sort-order-id")
default_sort_order_id: int = Field(..., alias="default-sort-order-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.default_sort_order_id != base_metadata.default_sort_order_id:
raise CommitFailedException(
f"Requirement failed: default sort order id has changed: expected {self.default_sort_order_id}, found {base_metadata.default_sort_order_id}"
)


class Namespace(IcebergRootModel[List[str]]):
"""Reference to one or more levels of a namespace."""
Expand Down Expand Up @@ -868,8 +944,8 @@ def snapshot(self) -> Optional[Snapshot]:
def projection(self) -> Schema:
snapshot_schema = self.table.schema()
if snapshot := self.snapshot():
if snapshot_schema_id := snapshot.schema_id:
snapshot_schema = self.table.schemas()[snapshot_schema_id]
if snapshot.schema_id is not None:
snapshot_schema = self.table.schemas()[snapshot.schema_id]

if "*" in self.selected_fields:
return snapshot_schema
Expand Down
Loading

0 comments on commit 7025018

Please sign in to comment.