Skip to content

Commit 76b4949

Browse files
committed
Merge branch 'main' into manifest_compaction
# Conflicts: # pyiceberg/table/__init__.py # tests/integration/test_writes.py
2 parents 2609002 + 36b56eb commit 76b4949

27 files changed

+2641
-456
lines changed

.github/workflows/check-md-link.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
name: Check Markdown links
2+
3+
on:
4+
push:
5+
paths:
6+
- mkdocs/**
7+
8+
jobs:
9+
markdown-link-check:
10+
runs-on: ubuntu-latest
11+
steps:
12+
- uses: actions/checkout@master
13+
- uses: gaurav-nelson/github-action-markdown-link-check@v1

mkdocs/docs/api.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,63 @@ with table.update_schema(allow_incompatible_changes=True) as update:
418418
update.delete_column("some_field")
419419
```
420420

421+
## Partition evolution
422+
423+
PyIceberg supports partition evolution. See the [partition evolution](https://iceberg.apache.org/spec/#partition-evolution)
424+
for more details.
425+
426+
The API to use when evolving partitions is the `update_spec` API on the table.
427+
428+
```python
429+
with table.update_spec() as update:
430+
update.add_field("id", BucketTransform(16), "bucketed_id")
431+
update.add_field("event_ts", DayTransform(), "day_ts")
432+
```
433+
434+
Updating the partition spec can also be done as part of a transaction with other operations.
435+
436+
```python
437+
with table.transaction() as transaction:
438+
with transaction.update_spec() as update_spec:
439+
update_spec.add_field("id", BucketTransform(16), "bucketed_id")
440+
update_spec.add_field("event_ts", DayTransform(), "day_ts")
441+
# ... Update properties etc
442+
```
443+
444+
### Add fields
445+
446+
New partition fields can be added via the `add_field` API which takes in the field name to partition on,
447+
the partition transform, and an optional partition name. If the partition name is not specified,
448+
one will be created.
449+
450+
```python
451+
with table.update_spec() as update:
452+
update.add_field("id", BucketTransform(16), "bucketed_id")
453+
update.add_field("event_ts", DayTransform(), "day_ts")
454+
# identity is a shortcut API for adding an IdentityTransform
455+
update.identity("some_field")
456+
```
457+
458+
### Remove fields
459+
460+
Partition fields can also be removed via the `remove_field` API if it no longer makes sense to partition on those fields.
461+
462+
```python
463+
with table.update_spec() as update:some_partition_name
464+
# Remove the partition field with the name
465+
update.remove_field("some_partition_name")
466+
```
467+
468+
### Rename fields
469+
470+
Partition fields can also be renamed via the `rename_field` API.
471+
472+
```python
473+
with table.update_spec() as update:
474+
# Rename the partition field with the name bucketed_id to sharded_id
475+
update.rename_field("bucketed_id", "sharded_id")
476+
```
477+
421478
## Table properties
422479

423480
Set and remove properties through the `Transaction` API:

mkdocs/docs/cli.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Options:
3636
--catalog TEXT
3737
--verbose BOOLEAN
3838
--output [text|json]
39+
--ugi TEXT
3940
--uri TEXT
4041
--credential TEXT
4142
--help Show this message and exit.

mkdocs/docs/configuration.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,27 @@ catalog:
148148
| Key | Example | Description |
149149
| ---------------------- | ----------------------- | -------------------------------------------------------------------------------------------------- |
150150
| uri | https://rest-catalog/ws | URI identifying the REST Server |
151+
| ugi | t-1234:secret | Hadoop UGI for Hive client. |
151152
| credential | t-1234:secret | Credential to use for OAuth2 credential flow when initializing the catalog |
152153
| token | FEW23.DFSDF.FSDF | Bearer token value to use for `Authorization` header |
153154
| rest.sigv4-enabled | true | Sign requests to the REST Server using AWS SigV4 protocol |
154155
| rest.signing-region | us-east-1 | The region to use when SigV4 signing a request |
155156
| rest.signing-name | execute-api | The service signing name to use when SigV4 signing a request |
156157
| rest.authorization-url | https://auth-service/cc | Authentication URL to use for client credentials authentication (default: uri + 'v1/oauth/tokens') |
157158

159+
### Headers in RESTCatalog
160+
161+
To configure custom headers in RESTCatalog, include them in the catalog properties with the prefix `header.`. This
162+
ensures that all HTTP requests to the REST service include the specified headers.
163+
164+
```yaml
165+
catalog:
166+
default:
167+
uri: http://rest-catalog/ws/
168+
credential: t-1234:secret
169+
header.content-type: application/vnd.api+json
170+
```
171+
158172
## SQL Catalog
159173

160174
The SQL catalog requires a database for its backend. PyIceberg supports PostgreSQL and SQLite through psycopg2. The database connection has to be configured using the `uri` property. See SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls):

pyiceberg/catalog/hive.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,21 @@ class _HiveClient:
130130

131131
_transport: TTransport
132132
_client: Client
133+
_ugi: Optional[List[str]]
133134

134-
def __init__(self, uri: str):
135+
def __init__(self, uri: str, ugi: Optional[str] = None):
135136
url_parts = urlparse(uri)
136137
transport = TSocket.TSocket(url_parts.hostname, url_parts.port)
137138
self._transport = TTransport.TBufferedTransport(transport)
138139
protocol = TBinaryProtocol.TBinaryProtocol(transport)
139140

140141
self._client = Client(protocol)
142+
self._ugi = ugi.split(':') if ugi else None
141143

142144
def __enter__(self) -> Client:
143145
self._transport.open()
146+
if self._ugi:
147+
self._client.set_ugi(*self._ugi)
144148
return self._client
145149

146150
def __exit__(
@@ -233,7 +237,7 @@ class HiveCatalog(Catalog):
233237

234238
def __init__(self, name: str, **properties: str):
235239
super().__init__(name, **properties)
236-
self._client = _HiveClient(properties["uri"])
240+
self._client = _HiveClient(properties["uri"], properties.get("ugi"))
237241

238242
def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
239243
properties: Dict[str, str] = table.parameters

pyiceberg/catalog/rest.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
Union,
2929
)
3030

31-
from pydantic import Field, ValidationError
31+
from pydantic import Field, ValidationError, field_validator
3232
from requests import HTTPError, Session
3333
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
3434

@@ -69,6 +69,7 @@
6969
)
7070
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
7171
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel
72+
from pyiceberg.types import transform_dict_value_to_str
7273

7374
if TYPE_CHECKING:
7475
import pyarrow as pa
@@ -115,6 +116,7 @@ class Endpoints:
115116
SIGV4_REGION = "rest.signing-region"
116117
SIGV4_SERVICE = "rest.signing-name"
117118
AUTH_URL = "rest.authorization-url"
119+
HEADER_PREFIX = "header."
118120

119121
NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)
120122

@@ -127,7 +129,7 @@ def _retry_hook(retry_state: RetryCallState) -> None:
127129
_RETRY_ARGS = {
128130
"retry": retry_if_exception_type(AuthorizationExpiredError),
129131
"stop": stop_after_attempt(2),
130-
"before": _retry_hook,
132+
"before_sleep": _retry_hook,
131133
"reraise": True,
132134
}
133135

@@ -146,6 +148,8 @@ class CreateTableRequest(IcebergBaseModel):
146148
write_order: Optional[SortOrder] = Field(alias="write-order")
147149
stage_create: bool = Field(alias="stage-create", default=False)
148150
properties: Properties = Field(default_factory=dict)
151+
# validators
152+
transform_properties_dict_value_to_str = field_validator('properties', mode='before')(transform_dict_value_to_str)
149153

150154

151155
class RegisterTableRequest(IcebergBaseModel):
@@ -156,8 +160,10 @@ class RegisterTableRequest(IcebergBaseModel):
156160
class TokenResponse(IcebergBaseModel):
157161
access_token: str = Field()
158162
token_type: str = Field()
159-
expires_in: int = Field()
160-
issued_token_type: str = Field()
163+
expires_in: Optional[int] = Field(default=None)
164+
issued_token_type: Optional[str] = Field(default=None)
165+
refresh_token: Optional[str] = Field(default=None)
166+
scope: Optional[str] = Field(default=None)
161167

162168

163169
class ConfigResponse(IcebergBaseModel):
@@ -231,9 +237,9 @@ def _create_session(self) -> Session:
231237

232238
# Sets the client side and server side SSL cert verification, if provided as properties.
233239
if ssl_config := self.properties.get(SSL):
234-
if ssl_ca_bundle := ssl_config.get(CA_BUNDLE): # type: ignore
240+
if ssl_ca_bundle := ssl_config.get(CA_BUNDLE):
235241
session.verify = ssl_ca_bundle
236-
if ssl_client := ssl_config.get(CLIENT): # type: ignore
242+
if ssl_client := ssl_config.get(CLIENT):
237243
if all(k in ssl_client for k in (CERT, KEY)):
238244
session.cert = (ssl_client[CERT], ssl_client[KEY])
239245
elif ssl_client_cert := ssl_client.get(CERT):
@@ -242,10 +248,7 @@ def _create_session(self) -> Session:
242248
self._refresh_token(session, self.properties.get(TOKEN))
243249

244250
# Set HTTP headers
245-
session.headers["Content-type"] = "application/json"
246-
session.headers["X-Client-Version"] = ICEBERG_REST_SPEC_VERSION
247-
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
248-
session.headers["X-Iceberg-Access-Delegation"] = "vended-credentials"
251+
self._config_headers(session)
249252

250253
# Configure SigV4 Request Signing
251254
if str(self.properties.get(SIGV4, False)).lower() == "true":
@@ -292,8 +295,9 @@ def _fetch_access_token(self, session: Session, credential: str) -> str:
292295
else:
293296
client_id, client_secret = None, credential
294297
data = {GRANT_TYPE: CLIENT_CREDENTIALS, CLIENT_ID: client_id, CLIENT_SECRET: client_secret, SCOPE: CATALOG_SCOPE}
295-
# Uses application/x-www-form-urlencoded by default
296-
response = session.post(url=self.auth_url, data=data)
298+
response = session.post(
299+
url=self.auth_url, data=data, headers={**session.headers, "Content-type": "application/x-www-form-urlencoded"}
300+
)
297301
try:
298302
response.raise_for_status()
299303
except HTTPError as exc:
@@ -447,17 +451,28 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:
447451
catalog=self,
448452
)
449453

450-
def _refresh_token(self, session: Optional[Session] = None, new_token: Optional[str] = None) -> None:
454+
def _refresh_token(self, session: Optional[Session] = None, initial_token: Optional[str] = None) -> None:
451455
session = session or self._session
452-
if new_token is not None:
453-
self.properties[TOKEN] = new_token
456+
if initial_token is not None:
457+
self.properties[TOKEN] = initial_token
454458
elif CREDENTIAL in self.properties:
455459
self.properties[TOKEN] = self._fetch_access_token(session, self.properties[CREDENTIAL])
456460

457461
# Set Auth token for subsequent calls in the session
458462
if token := self.properties.get(TOKEN):
459463
session.headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {token}"
460464

465+
def _config_headers(self, session: Session) -> None:
466+
session.headers["Content-type"] = "application/json"
467+
session.headers["X-Client-Version"] = ICEBERG_REST_SPEC_VERSION
468+
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
469+
session.headers["X-Iceberg-Access-Delegation"] = "vended-credentials"
470+
header_properties = self._extract_headers_from_properties()
471+
session.headers.update(header_properties)
472+
473+
def _extract_headers_from_properties(self) -> Dict[str, str]:
474+
return {key[len(HEADER_PREFIX) :]: value for key, value in self.properties.items() if key.startswith(HEADER_PREFIX)}
475+
461476
@retry(**_RETRY_ARGS)
462477
def create_table(
463478
self,

pyiceberg/catalog/sql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,9 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper
567567
Raises:
568568
NoSuchNamespaceError: If a namespace with the given name does not exist.
569569
"""
570-
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
570+
database_name = self.identifier_to_database(namespace)
571+
if not self._namespace_exists(database_name):
572+
raise NoSuchNamespaceError(f"Database {database_name} does not exists")
571573

572574
stmt = select(IcebergNamespaceProperties).where(
573575
IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == database_name

pyiceberg/cli/console.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,22 @@ def wrapper(*args: Any, **kwargs: Any): # type: ignore
5959
@click.option("--catalog")
6060
@click.option("--verbose", type=click.BOOL)
6161
@click.option("--output", type=click.Choice(["text", "json"]), default="text")
62+
@click.option("--ugi")
6263
@click.option("--uri")
6364
@click.option("--credential")
6465
@click.pass_context
65-
def run(ctx: Context, catalog: Optional[str], verbose: bool, output: str, uri: Optional[str], credential: Optional[str]) -> None:
66+
def run(
67+
ctx: Context,
68+
catalog: Optional[str],
69+
verbose: bool,
70+
output: str,
71+
ugi: Optional[str],
72+
uri: Optional[str],
73+
credential: Optional[str],
74+
) -> None:
6675
properties = {}
76+
if ugi:
77+
properties["ugi"] = ugi
6778
if uri:
6879
properties["uri"] = uri
6980
if credential:

pyiceberg/io/pyarrow.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@
125125
visit_with_partner,
126126
)
127127
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
128+
from pyiceberg.table.metadata import TableMetadata
128129
from pyiceberg.table.name_mapping import NameMapping
129130
from pyiceberg.transforms import TruncateTransform
130131
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
@@ -1720,7 +1721,7 @@ def fill_parquet_file_metadata(
17201721
data_file.split_offsets = split_offsets
17211722

17221723

1723-
def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[Schema] = None) -> Iterator[DataFile]:
1724+
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
17241725
task = next(tasks)
17251726

17261727
try:
@@ -1730,15 +1731,15 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
17301731
except StopIteration:
17311732
pass
17321733

1733-
parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
1734+
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
17341735

1735-
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
1736-
file_schema = file_schema or table.schema()
1737-
arrow_file_schema = schema_to_pyarrow(file_schema)
1736+
file_path = f'{table_metadata.location}/data/{task.generate_data_file_filename("parquet")}'
1737+
schema = table_metadata.schema()
1738+
arrow_file_schema = schema_to_pyarrow(schema)
17381739

1739-
fo = table.io.new_output(file_path)
1740+
fo = io.new_output(file_path)
17401741
row_group_size = PropertyUtil.property_as_int(
1741-
properties=table.properties,
1742+
properties=table_metadata.properties,
17421743
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
17431744
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
17441745
)
@@ -1757,16 +1758,16 @@ def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[S
17571758
# sort_order_id=task.sort_order_id,
17581759
sort_order_id=None,
17591760
# Just copy these from the table for now
1760-
spec_id=table.spec().spec_id,
1761+
spec_id=table_metadata.default_spec_id,
17611762
equality_ids=None,
17621763
key_metadata=None,
17631764
)
17641765

17651766
fill_parquet_file_metadata(
17661767
data_file=data_file,
17671768
parquet_metadata=writer.writer.metadata,
1768-
stats_columns=compute_statistics_plan(file_schema, table.properties),
1769-
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
1769+
stats_columns=compute_statistics_plan(schema, table_metadata.properties),
1770+
parquet_column_mapping=parquet_path_to_id_mapping(schema),
17701771
)
17711772
return iter([data_file])
17721773

0 commit comments

Comments
 (0)