Skip to content

Commit

Permalink
Merge branch 'main' into honah_aws_configs
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Jul 14, 2024
2 parents ce5b604 + 3f44dfe commit a67f549
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 96 deletions.
10 changes: 10 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ catalog:
region_name: <REGION_NAME>
```

<!-- markdown-link-check-disable -->

| Key | Example | Description |
| ----------------- | ------------------------------------ | ------------------------------------------------------------------------------- |
| glue.id | 111111111111 | Configure the 12-digit ID of the Glue Catalog |
| glue.skip-archive | true | Configure whether to skip the archival of older table versions. Default to true |
| glue.endpoint | https://glue.us-east-1.amazonaws.com | Configure an alternative endpoint of the Glue service for GlueCatalog to access |

<!-- markdown-link-check-enable-->

## DynamoDB Catalog

If you want to use AWS DynamoDB as the catalog, you can use the last two ways to configure the pyiceberg and refer
Expand Down
15 changes: 15 additions & 0 deletions mkdocs/docs/how-to-release.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ The guide to release PyIceberg.

The first step is to publish a release candidate (RC) and publish it to the public for testing and validation. Once the vote has passed on the RC, the RC turns into the new release.

## Preparing for a release

Before running the release candidate, we want to remove any APIs that were marked for removal under the @deprecated tag for this release.

For example, the API with the following deprecation tag should be removed when preparing for the 0.2.0 release.

```python

@deprecated(
deprecated_in="0.1.0",
removed_in="0.2.0",
help_message="Please use load_something_else() instead",
)
```

## Running a release candidate

Make sure that the version is correct in `pyproject.toml` and `pyiceberg/__init__.py`. Correct means that it reflects the version that you want to release.
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@
GLUE_SKIP_ARCHIVE = "glue.skip-archive"
GLUE_SKIP_ARCHIVE_DEFAULT = True

# Configure an alternative endpoint of the Glue service for GlueCatalog to access.
# This could be used to use GlueCatalog with any glue-compatible metastore service that has a different endpoint
GLUE_CATALOG_ENDPOINT = "glue.endpoint"

ICEBERG_FIELD_ID = "iceberg.field.id"
ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
ICEBERG_FIELD_CURRENT = "iceberg.field.current"
Expand Down Expand Up @@ -313,7 +317,7 @@ def __init__(self, name: str, **properties: Any):
properties, GLUE_SESSION_TOKEN, AWS_SESSION_TOKEN, DEPRECATED_SESSION_TOKEN
),
)
self.glue: GlueClient = session.client("glue")
self.glue: GlueClient = session.client("glue", endpoint_url=properties.get(GLUE_CATALOG_ENDPOINT))

if glue_catalog_id := properties.get(GLUE_ID):
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)
Expand Down
74 changes: 56 additions & 18 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import millis_to_datetime
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string

Expand All @@ -178,6 +179,7 @@
MAP_KEY_NAME = "key"
MAP_VALUE_NAME = "value"
DOC = "doc"
UTC_ALIASES = {"UTC", "+00:00", "Etc/UTC", "Z"}

T = TypeVar("T")

Expand Down Expand Up @@ -943,7 +945,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")

if primitive.tz == "UTC" or primitive.tz == "+00:00":
if primitive.tz in UTC_ALIASES:
return TimestamptzType()
elif primitive.tz is None:
return TimestampType()
Expand Down Expand Up @@ -1079,7 +1081,7 @@ def _task_to_record_batches(
arrow_table = pa.Table.from_batches([batch])
arrow_table = arrow_table.filter(pyarrow_filter)
batch = arrow_table.to_batches()[0]
yield to_requested_schema(projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True)
yield _to_requested_schema(projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True)
current_index += len(batch)


Expand Down Expand Up @@ -1284,7 +1286,24 @@ def project_batches(
total_row_count += len(batch)


def to_requested_schema(
@deprecated(
deprecated_in="0.7.0",
removed_in="0.8.0",
help_message="The public API for 'to_requested_schema' is deprecated and is replaced by '_to_requested_schema'",
)
def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: pa.Table) -> pa.Table:
struct_array = visit_with_partner(requested_schema, table, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema))

arrays = []
fields = []
for pos, field in enumerate(requested_schema.fields):
array = struct_array.field(pos)
arrays.append(array)
fields.append(pa.field(field.name, array.type, field.optional))
return pa.Table.from_arrays(arrays, schema=pa.schema(fields))


def _to_requested_schema(
requested_schema: Schema,
file_schema: Schema,
batch: pa.RecordBatch,
Expand All @@ -1302,31 +1321,49 @@ def to_requested_schema(


class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]):
file_schema: Schema
_file_schema: Schema
_include_field_ids: bool
_downcast_ns_timestamp_to_us: bool

def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False) -> None:
self.file_schema = file_schema
self._file_schema = file_schema
self._include_field_ids = include_field_ids
self.downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us

def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
file_field = self.file_schema.find_field(field.field_id)
file_field = self._file_schema.find_field(field.field_id)

if field.field_type.is_primitive:
if field.field_type != file_field.field_type:
return values.cast(
schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids)
)
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
# Downcasting of nanoseconds to microseconds
if (
pa.types.is_timestamp(target_type)
and target_type.unit == "us"
and pa.types.is_timestamp(values.type)
and values.type.unit == "ns"
):
return values.cast(target_type, safe=False)
if field.field_type == TimestampType():
# Downcasting of nanoseconds to microseconds
if (
pa.types.is_timestamp(target_type)
and not target_type.tz
and pa.types.is_timestamp(values.type)
and not values.type.tz
):
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
elif target_type.unit == "us" and values.type.unit in {"s", "ms"}:
return values.cast(target_type)
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
elif field.field_type == TimestamptzType():
if (
pa.types.is_timestamp(target_type)
and target_type.tz == "UTC"
and pa.types.is_timestamp(values.type)
and values.type.tz in UTC_ALIASES
):
if target_type.unit == "us" and values.type.unit == "ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
elif target_type.unit == "us" and values.type.unit in {"s", "ms", "us"}:
return values.cast(target_type)
raise ValueError(f"Unsupported schema projection from {values.type} to {target_type}")
return values

def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field:
Expand Down Expand Up @@ -1421,6 +1458,8 @@ def field_partner(self, partner_struct: Optional[pa.Array], field_id: int, _: st

if isinstance(partner_struct, pa.StructArray):
return partner_struct.field(name)
elif isinstance(partner_struct, pa.Table):
return partner_struct.column(name).combine_chunks()
elif isinstance(partner_struct, pa.RecordBatch):
return partner_struct.column(name)
else:
Expand Down Expand Up @@ -1882,6 +1921,7 @@ def data_file_statistics_from_parquet_metadata(

col_aggs = {}

invalidate_col: Set[int] = set()
for r in range(parquet_metadata.num_row_groups):
# References:
# https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232
Expand All @@ -1897,8 +1937,6 @@ def data_file_statistics_from_parquet_metadata(
else:
split_offsets.append(data_offset)

invalidate_col: Set[int] = set()

for pos in range(parquet_metadata.num_columns):
column = row_group.column(pos)
field_id = parquet_column_mapping[column.path_in_schema]
Expand Down Expand Up @@ -1977,7 +2015,7 @@ def write_parquet(task: WriteTask) -> DataFile:

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
batches = [
to_requested_schema(
_to_requested_schema(
requested_schema=file_schema,
file_schema=table_schema,
batch=batch,
Expand Down
8 changes: 0 additions & 8 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,6 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT)
_check_schema_compatible(
self._table.schema(), other_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)
# cast if the two schemas are compatible but not equal
table_arrow_schema = self._table.schema().as_arrow()
if table_arrow_schema != df.schema:
df = df.cast(table_arrow_schema)

manifest_merge_enabled = PropertyUtil.property_as_bool(
self.table_metadata.properties,
Expand Down Expand Up @@ -552,10 +548,6 @@ def overwrite(
_check_schema_compatible(
self._table.schema(), other_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)
# cast if the two schemas are compatible but not equal
table_arrow_schema = self._table.schema().as_arrow()
if table_arrow_schema != df.schema:
df = df.cast(table_arrow_schema)

self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)

Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def transform_dict_value_to_str(dict: Dict[str, Any]) -> Dict[str, str]:
for key, value in dict.items():
if value is None:
raise ValueError(f"None type is not a supported value in properties: {key}")
return {k: str(v) for k, v in dict.items()}
return {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in dict.items()}


def _parse_decimal_type(decimal: Any) -> Tuple[int, int]:
Expand Down
8 changes: 5 additions & 3 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from botocore.exceptions import ClientError

from pyiceberg.catalog import Catalog, MetastoreCatalog
from pyiceberg.catalog.glue import GlueCatalog
from pyiceberg.catalog.glue import GLUE_CATALOG_ENDPOINT, GlueCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
Expand All @@ -36,7 +36,7 @@
from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType
from tests.conftest import clean_up, get_bucket_name, get_s3_path
from tests.conftest import clean_up, get_bucket_name, get_glue_endpoint, get_s3_path

# The number of tables/databases used in list_table/namespace test
LIST_TEST_NUMBER = 2
Expand All @@ -51,7 +51,9 @@ def fixture_glue_client() -> boto3.client:
@pytest.fixture(name="test_catalog", scope="module")
def fixture_test_catalog() -> Generator[Catalog, None, None]:
"""Configure the pre- and post-setting of aws integration test."""
test_catalog = GlueCatalog(CATALOG_NAME, warehouse=get_s3_path(get_bucket_name()))
test_catalog = GlueCatalog(
CATALOG_NAME, **{"warehouse": get_s3_path(get_bucket_name()), GLUE_CATALOG_ENDPOINT: get_glue_endpoint()}
)
yield test_catalog
clean_up(test_catalog)

Expand Down
10 changes: 10 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,3 +925,13 @@ def test_register_table_with_given_location(
table = test_catalog.register_table(identifier, location)
assert table.identifier == (catalog_name,) + identifier
assert test_catalog.table_exists(identifier) is True


@mock_aws
def test_glue_endpoint_override(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None:
catalog_name = "glue"
test_endpoint = "https://test-endpoint"
test_catalog = GlueCatalog(
catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}", "glue.endpoint": test_endpoint}
)
assert test_catalog.glue.meta.endpoint_url == test_endpoint
9 changes: 6 additions & 3 deletions tests/cli/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None:
NestedField(3, "z", LongType(), required=True),
)
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728"}
TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728", "write.parquet.bloom-filter-enabled.column.x": True}
TEST_TABLE_UUID = uuid.UUID("d20125c8-7284-442c-9aea-15fee620737c")
TEST_TIMESTAMP = 1602638573874
MOCK_ENVIRONMENT = {"PYICEBERG_CATALOG__PRODUCTION__URI": "test://doesnotexist"}
Expand Down Expand Up @@ -367,7 +367,10 @@ def test_properties_get_table(catalog: InMemoryCatalog) -> None:
runner = CliRunner()
result = runner.invoke(run, ["properties", "get", "table", "default.my_table"])
assert result.exit_code == 0
assert result.output == "read.split.target.size 134217728\n"
assert (
result.output
== "read.split.target.size 134217728\nwrite.parquet.bloom-filter-enabled.column.x true \n"
)


def test_properties_get_table_specific_property(catalog: InMemoryCatalog) -> None:
Expand Down Expand Up @@ -763,7 +766,7 @@ def test_json_properties_get_table(catalog: InMemoryCatalog) -> None:
runner = CliRunner()
result = runner.invoke(run, ["--output=json", "properties", "get", "table", "default.my_table"])
assert result.exit_code == 0
assert result.output == """{"read.split.target.size": "134217728"}\n"""
assert result.output == """{"read.split.target.size": "134217728", "write.parquet.bloom-filter-enabled.column.x": "true"}\n"""


def test_json_properties_get_table_specific_property(catalog: InMemoryCatalog) -> None:
Expand Down
Loading

0 comments on commit a67f549

Please sign in to comment.