Skip to content

#49 Implement Python native write with PyArrow #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions pypaimon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,38 @@
# limitations under the License.
#################################################################################

from .api import Schema
from .py4j import Catalog
from .py4j import CommitMessage
from .py4j import Predicate
from .py4j import PredicateBuilder
from .py4j import ReadBuilder
from .py4j import RowType
from .py4j import Split
from .py4j import Table
from .py4j import BatchTableCommit
from .py4j import TableRead
from .py4j import TableScan
from .py4j import Plan
from .py4j import BatchTableWrite
from .py4j import BatchWriteBuilder

__all__ = [
'Schema',
'Catalog',
'CommitMessage',
'Predicate',
'PredicateBuilder',
'ReadBuilder',
'RowType',
'Split',
'Table',
'BatchTableCommit',
'TableRead',
'TableScan',
'Plan',
'BatchTableWrite',
'BatchWriteBuilder'
]
# from .api import Schema
# from .api import Database
# from .py4j import Catalog
# from .py4j import CommitMessage
# from .py4j import Predicate
# from .py4j import PredicateBuilder
# from .py4j import ReadBuilder
# from .py4j import RowType
# from .py4j import Split
# from .py4j import Table
# from .py4j import BatchTableCommit
# from .py4j import TableRead
# from .py4j import TableScan
# from .py4j import Plan
# from .py4j import BatchTableWrite
# from .py4j import BatchWriteBuilder
#
# __all__ = [
# 'Schema',
# 'Database',
# 'Catalog',
# 'CommitMessage',
# 'Predicate',
# 'PredicateBuilder',
# 'ReadBuilder',
# 'RowType',
# 'Split',
# 'Table',
# 'BatchTableCommit',
# 'TableRead',
# 'TableScan',
# 'Plan',
# 'BatchTableWrite',
# 'BatchWriteBuilder'
# ]
4 changes: 3 additions & 1 deletion pypaimon/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
from .table_commit import BatchTableCommit
from .table_write import BatchTableWrite
from .write_builder import BatchWriteBuilder
from .table import Table, Schema
from .schema import Schema
from .table import Table
from .database import Database
from .catalog import Catalog

__all__ = [
Expand Down
7 changes: 3 additions & 4 deletions pypaimon/api/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from abc import ABC, abstractmethod
from typing import Optional
from pypaimon.api import Table, Schema
from pypaimon.api import Table, Schema, Database


class Catalog(ABC):
Expand All @@ -27,10 +27,9 @@ class Catalog(ABC):
metadata such as database/table from a paimon catalog.
"""

@staticmethod
@abstractmethod
def create(catalog_options: dict) -> 'Catalog':
"""Create catalog from configuration."""
def get_database(self, name: str) -> 'Database':
"""Get paimon database identified by the given name."""

@abstractmethod
def get_table(self, identifier: str) -> Table:
Expand Down
18 changes: 18 additions & 0 deletions pypaimon/api/catalog_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pypaimon.api.catalog import Catalog


class CatalogFactory:

@staticmethod
def create(catalog_options: dict) -> Catalog:
from pypaimon.pynative.catalog.catalog_option import CatalogOptions
from pypaimon.pynative.catalog.abstract_catalog import AbstractCatalog
from pypaimon.pynative.catalog.filesystem_catalog import FileSystemCatalog # noqa: F401
from pypaimon.pynative.catalog.hive_catalog import HiveCatalog # noqa: F401

identifier = catalog_options.get(CatalogOptions.METASTORE, "filesystem")
subclasses = AbstractCatalog.__subclasses__()
for subclass in subclasses:
if subclass.identifier() == identifier:
return subclass(catalog_options)
raise ValueError(f"Unknown catalog identifier: {identifier}")
6 changes: 5 additions & 1 deletion pypaimon/api/commit_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
# limitations under the License.
#################################################################################

from abc import ABC
from abc import ABC, abstractmethod


class CommitMessage(ABC):
"""Commit message collected from writer."""

@abstractmethod
def is_empty(self):
""""""
28 changes: 28 additions & 0 deletions pypaimon/api/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from typing import Optional


class Database:
"""Structure of a Database."""

def __init__(self, name: str, properties: dict, comment: Optional[str] = None):
self.name = name
self.properties = properties
self.comment = comment
37 changes: 37 additions & 0 deletions pypaimon/api/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

import pyarrow as pa

from typing import Optional, List


class Schema:
"""Schema of a table."""

def __init__(self,
pa_schema: pa.Schema,
partition_keys: Optional[List[str]] = None,
primary_keys: Optional[List[str]] = None,
options: Optional[dict] = None,
comment: Optional[str] = None):
self.pa_schema = pa_schema
self.partition_keys = partition_keys
self.primary_keys = primary_keys
self.options = options
self.comment = comment
19 changes: 0 additions & 19 deletions pypaimon/api/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
# limitations under the License.
#################################################################################

import pyarrow as pa

from abc import ABC, abstractmethod
from pypaimon.api import ReadBuilder, BatchWriteBuilder
from typing import Optional, List


class Table(ABC):
Expand All @@ -33,19 +30,3 @@ def new_read_builder(self) -> ReadBuilder:
@abstractmethod
def new_batch_write_builder(self) -> BatchWriteBuilder:
"""Returns a builder for building batch table write and table commit."""


class Schema:
"""Schema of a table."""

def __init__(self,
pa_schema: pa.Schema,
partition_keys: Optional[List[str]] = None,
primary_keys: Optional[List[str]] = None,
options: Optional[dict] = None,
comment: Optional[str] = None):
self.pa_schema = pa_schema
self.partition_keys = partition_keys
self.primary_keys = primary_keys
self.options = options
self.comment = comment
6 changes: 5 additions & 1 deletion pypaimon/py4j/java_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from pypaimon.pynative.common.exception import PyNativeNotImplementedError
from pypaimon.pynative.common.predicate import PyNativePredicate
from pypaimon.pynative.common.row.internal_row import InternalRow
from pypaimon.pynative.reader.row.internal_row import InternalRow
from pypaimon.pynative.util.reader_converter import ReaderConverter

if TYPE_CHECKING:
Expand All @@ -43,6 +43,10 @@

class Catalog(catalog.Catalog):

@staticmethod
def identifier() -> str:
pass # TODO

def __init__(self, j_catalog, catalog_options: dict):
self._j_catalog = j_catalog
self._catalog_options = catalog_options
Expand Down
Empty file.
114 changes: 114 additions & 0 deletions pypaimon/pynative/catalog/abstract_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################

from abc import abstractmethod
from pathlib import Path
from typing import Optional

from pypaimon.api import Schema, Table
from pypaimon.api import Catalog
from pypaimon.pynative.common.exception import PyNativeNotImplementedError
from pypaimon.pynative.catalog.catalog_constant import CatalogConstants
from pypaimon.pynative.catalog.catalog_exception import DatabaseNotExistException, DatabaseAlreadyExistException, \
TableAlreadyExistException, TableNotExistException
from pypaimon.pynative.catalog.catalog_option import CatalogOptions
from pypaimon.pynative.common.file_io import FileIO
from pypaimon.pynative.common.identifier import TableIdentifier
from pypaimon.pynative.table.core_option import CoreOptions


class AbstractCatalog(Catalog):
def __init__(self, catalog_options: dict):
if CatalogOptions.WAREHOUSE not in catalog_options:
raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE}' path must be set")
self.warehouse = Path(catalog_options.get(CatalogOptions.WAREHOUSE))
self.catalog_options = catalog_options
self.file_io = FileIO(self.warehouse, self.catalog_options)

@staticmethod
@abstractmethod
def identifier() -> str:
"""Catalog Identifier"""

@abstractmethod
def allow_custom_table_path(self) -> bool:
"""Allow Custom Table Path"""

@abstractmethod
def create_database_impl(self, name: str, properties: Optional[dict] = None):
"""Create DataBase Implementation"""

@abstractmethod
def create_table_impl(self, table_identifier: TableIdentifier, schema: 'Schema'):
"""Create Table Implementation"""

@abstractmethod
def get_table_schema(self, table_identifier: TableIdentifier):
"""Get Table Schema"""

@abstractmethod
def lock_factory(self):
"""Lock Factory"""

@abstractmethod
def metastore_client_factory(self):
"""MetaStore Client Factory"""

def get_table(self, identifier: str) -> Table:
return self.get_table_impl(TableIdentifier(identifier))

def get_table_impl(self, table_identifier: TableIdentifier) -> Table:
from pypaimon.pynative.table.file_store_table import FileStoreTableFactory

if CoreOptions.SCAN_FALLBACK_BRANCH in self.catalog_options:
raise PyNativeNotImplementedError(CoreOptions.SCAN_FALLBACK_BRANCH)

table_path = self.get_table_location(table_identifier)
table_schema = self.get_table_schema(table_identifier)
return FileStoreTableFactory.create(self.file_io, table_identifier, table_path, table_schema)

def create_database(self, name: str, ignore_if_exists: bool, properties: Optional[dict] = None):
try:
self.get_database(name)
if not ignore_if_exists:
raise DatabaseAlreadyExistException(name)
except DatabaseNotExistException:
self.create_database_impl(name, properties)

def create_table(self, identifier: str, schema: 'Schema', ignore_if_exists: bool):
if schema.options and schema.options.get(CoreOptions.AUTO_CREATE):
raise ValueError(f"The value of {CoreOptions.AUTO_CREATE} property should be False.")
if schema.options and CoreOptions.PATH in schema.options and not self.allow_custom_table_path():
raise ValueError(f"The current catalog does not support specifying the table path when creating a table.")

table_identifier = TableIdentifier(identifier)
self.get_database(table_identifier.get_database_name())
try:
self.get_table_impl(table_identifier)
if not ignore_if_exists:
raise TableAlreadyExistException(identifier)
except TableNotExistException:
if schema.options and CoreOptions.TYPE in schema.options and schema.options.get(CoreOptions.TYPE) != "table":
raise PyNativeNotImplementedError(f"Table Type {schema.options.get(CoreOptions.TYPE)}")
return self.create_table_impl(table_identifier, schema)

def get_database_path(self, name):
return self.warehouse / f"{name}{CatalogConstants.DB_SUFFIX}"

def get_table_location(self, table_identifier: TableIdentifier):
return self.get_database_path(table_identifier.get_database_name()) / table_identifier.get_table_name()
17 changes: 17 additions & 0 deletions pypaimon/pynative/catalog/catalog_constant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from enum import Enum


class CatalogConstants(str, Enum):

def __str__(self):
return self.value

SYSTEM_TABLE_SPLITTER = '$'
SYSTEM_BRANCH_PREFIX = 'branch-'

COMMENT_PROP = "comment"
OWNER_PROP = "owner"

DEFAULT_DATABASE = "default"
DB_SUFFIX = ".db"
DB_LOCATION_PROP = "location"
Loading