Skip to content

Commit

Permalink
create_table write metadata file
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjqliu committed Jan 31, 2024
1 parent 67c028a commit 96ba8de
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 69 deletions.
10 changes: 6 additions & 4 deletions pyiceberg/catalog/in_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE
from pyiceberg.io import WAREHOUSE, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
Expand Down Expand Up @@ -94,8 +94,7 @@ def create_table(
if not location:
location = f'{self._warehouse_location}/{"/".join(identifier)}'

metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json'

metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
schema=schema,
partition_spec=partition_spec,
Expand All @@ -104,11 +103,14 @@ def create_table(
properties=properties,
table_uuid=table_uuid,
)
io = load_file_io({**self.properties, **properties}, location=location)
self._write_metadata(metadata, io, metadata_location)

table = Table(
identifier=identifier,
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(properties=metadata.properties, location=metadata_location),
io=io,
catalog=self,
)
self.__tables[identifier] = table
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/cli/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def describe_properties(self, properties: Properties) -> None:
Console().print(output_table)

def text(self, response: str) -> None:
Console().print(response)
Console(soft_wrap=True).print(response)

def schema(self, schema: Schema) -> None:
output_table = self._table
Expand Down
16 changes: 13 additions & 3 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
)
TEST_TABLE_LOCATION = "protocol://some/location"
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)"
Expand Down Expand Up @@ -123,13 +122,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None:
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table


def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=new_location,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
assert table.location() == new_location


@pytest.mark.parametrize(
"schema,expected",
[
Expand All @@ -151,7 +162,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=pyarrow_schema_simple_without_ids,
location=TEST_TABLE_LOCATION,
properties=TEST_TABLE_PROPERTIES,
)
assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table
Expand Down
69 changes: 8 additions & 61 deletions tests/cli/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import datetime
import os
import uuid
from pathlib import PosixPath
from unittest.mock import MagicMock

import pytest
from click.testing import CliRunner
from pytest_mock import MockFixture

from pyiceberg.catalog.in_memory import DEFAULT_WAREHOUSE_LOCATION
from pyiceberg.cli.console import run
from pyiceberg.io import WAREHOUSE
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -52,8 +53,10 @@ def env_vars(mocker: MockFixture) -> None:


@pytest.fixture(name="catalog")
def fixture_catalog(mocker: MockFixture) -> InMemoryCatalog:
in_memory_catalog = InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"})
def fixture_catalog(mocker: MockFixture, tmp_path: PosixPath) -> InMemoryCatalog:
in_memory_catalog = InMemoryCatalog(
"test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"}
)
mocker.patch("pyiceberg.cli.console.load_catalog", return_value=in_memory_catalog)
return in_memory_catalog

Expand All @@ -79,7 +82,6 @@ def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None:
NestedField(2, "y", LongType(), doc="comment"),
NestedField(3, "z", LongType()),
)
TEST_TABLE_LOCATION = "s3://bucket/test/location"
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728"}
TEST_TABLE_UUID = uuid.UUID("d20125c8-7284-442c-9aea-15fee620737c")
Expand All @@ -101,7 +103,6 @@ def test_list_namespace(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand Down Expand Up @@ -138,7 +139,6 @@ def test_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None) -> No
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)
Expand Down Expand Up @@ -182,7 +182,6 @@ def test_schema(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand Down Expand Up @@ -211,7 +210,6 @@ def test_spec(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand Down Expand Up @@ -240,7 +238,6 @@ def test_uuid(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)
Expand All @@ -266,25 +263,10 @@ def test_location(catalog: InMemoryCatalog) -> None:
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

runner = CliRunner()
result = runner.invoke(run, ["location", "default.my_table"])
assert result.exit_code == 0
assert result.output == f"""{DEFAULT_WAREHOUSE_LOCATION}/default/my_table\n"""


def test_location_override(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

runner = CliRunner()
result = runner.invoke(run, ["location", "default.my_table"])
assert result.exit_code == 0
assert result.output == f"""{TEST_TABLE_LOCATION}\n"""
assert result.output == f"""{catalog._warehouse_location}/default/my_table\n"""


def test_location_does_not_exists(catalog: InMemoryCatalog) -> None:
Expand All @@ -300,7 +282,6 @@ def test_drop_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand Down Expand Up @@ -341,7 +322,6 @@ def test_rename_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand All @@ -364,7 +344,6 @@ def test_properties_get_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand All @@ -379,7 +358,6 @@ def test_properties_get_table_specific_property(catalog: InMemoryCatalog) -> Non
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand All @@ -394,7 +372,6 @@ def test_properties_get_table_specific_property_that_doesnt_exist(catalog: InMem
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand Down Expand Up @@ -463,7 +440,6 @@ def test_properties_set_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand Down Expand Up @@ -504,7 +480,6 @@ def test_properties_remove_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand All @@ -519,7 +494,6 @@ def test_properties_remove_table_property_does_not_exists(catalog: InMemoryCatal
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand Down Expand Up @@ -551,7 +525,6 @@ def test_json_list_namespace(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand Down Expand Up @@ -584,7 +557,6 @@ def test_json_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None)
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)
Expand Down Expand Up @@ -614,7 +586,6 @@ def test_json_schema(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand All @@ -640,7 +611,6 @@ def test_json_spec(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand All @@ -663,7 +633,6 @@ def test_json_uuid(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
table_uuid=TEST_TABLE_UUID,
)
Expand Down Expand Up @@ -693,21 +662,7 @@ def test_json_location(catalog: InMemoryCatalog) -> None:
runner = CliRunner()
result = runner.invoke(run, ["--output=json", "location", "default.my_table"])
assert result.exit_code == 0
assert result.output == f'"{DEFAULT_WAREHOUSE_LOCATION}/default/my_table"\n'


def test_json_location_override(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

runner = CliRunner()
result = runner.invoke(run, ["--output=json", "location", "default.my_table"])
assert result.exit_code == 0
assert result.output == f'"{TEST_TABLE_LOCATION}"\n'
assert result.output == f'"{catalog._warehouse_location}/default/my_table"\n'


def test_json_location_does_not_exists(catalog: InMemoryCatalog) -> None:
Expand All @@ -723,7 +678,6 @@ def test_json_drop_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand Down Expand Up @@ -764,7 +718,6 @@ def test_json_rename_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

Expand All @@ -787,7 +740,6 @@ def test_json_properties_get_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand All @@ -802,7 +754,6 @@ def test_json_properties_get_table_specific_property(catalog: InMemoryCatalog) -
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand All @@ -817,7 +768,6 @@ def test_json_properties_get_table_specific_property_that_doesnt_exist(catalog:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand Down Expand Up @@ -891,7 +841,6 @@ def test_json_properties_set_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand Down Expand Up @@ -937,7 +886,6 @@ def test_json_properties_remove_table(catalog: InMemoryCatalog) -> None:
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand All @@ -952,7 +900,6 @@ def test_json_properties_remove_table_property_does_not_exists(catalog: InMemory
catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
location=TEST_TABLE_LOCATION,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
Expand Down

0 comments on commit 96ba8de

Please sign in to comment.