diff --git a/pypaimon/api/table_read.py b/pypaimon/api/table_read.py index 9fcb78c..60b31e7 100644 --- a/pypaimon/api/table_read.py +++ b/pypaimon/api/table_read.py @@ -18,12 +18,14 @@ import pandas as pd import pyarrow as pa -import ray from abc import ABC, abstractmethod -from duckdb.duckdb import DuckDBPyConnection from pypaimon.api import Split -from typing import List, Optional +from typing import List, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + import ray + from duckdb.duckdb import DuckDBPyConnection class TableRead(ABC): @@ -46,9 +48,9 @@ def to_duckdb( self, splits: List[Split], table_name: str, - connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + connection: Optional["DuckDBPyConnection"] = None) -> "DuckDBPyConnection": """Convert splits into an in-memory DuckDB table which can be queried.""" @abstractmethod - def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset: + def to_ray(self, splits: List[Split]) -> "ray.data.dataset.Dataset": """Convert splits into a Ray dataset format.""" diff --git a/pypaimon/py4j/java_implementation.py b/pypaimon/py4j/java_implementation.py index 17c6eda..ce90bc5 100644 --- a/pypaimon/py4j/java_implementation.py +++ b/pypaimon/py4j/java_implementation.py @@ -18,19 +18,20 @@ # pypaimon.api implementation based on Java code & py4j lib -import duckdb import pandas as pd import pyarrow as pa -import ray -from duckdb.duckdb import DuckDBPyConnection from pypaimon.py4j.java_gateway import get_gateway from pypaimon.py4j.util import java_utils, constants from pypaimon.api import \ (catalog, table, read_builder, table_scan, split, table_read, write_builder, table_write, commit_message, table_commit, Schema, predicate) -from typing import List, Iterator, Optional, Any +from typing import List, Iterator, Optional, Any, TYPE_CHECKING + +if TYPE_CHECKING: + import ray + from duckdb.duckdb import DuckDBPyConnection class Catalog(catalog.Catalog): @@ -171,12 +172,16 @@ def to_duckdb( self, splits: List[Split], table_name: str, - connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + connection: Optional["DuckDBPyConnection"] = None) -> "DuckDBPyConnection": + import duckdb + con = connection or duckdb.connect(database=":memory:") con.register(table_name, self.to_arrow(splits)) return con - def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset: + def to_ray(self, splits: List[Split]) -> "ray.data.dataset.Dataset": + import ray + return ray.data.from_arrow(self.to_arrow(splits)) def _init(self):