Skip to content

Commit

Permalink
docstrings (apache#1189)
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy authored Sep 20, 2024
1 parent b8b2f66 commit 1d9570d
Showing 1 changed file with 101 additions and 5 deletions.
106 changes: 101 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ
return self

def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE) -> DataScan:
"""Minimal data scan the table with the current state of the transaction."""
"""Minimal data scan of the table with the current state of the transaction."""
return DataScan(
table_metadata=self.table_metadata,
io=self._table.io,
Expand Down Expand Up @@ -681,6 +681,8 @@ def commit_transaction(self) -> Table:


class CreateTableTransaction(Transaction):
"""A transaction that involves the creation of a a new table."""

def _initial_changes(self, table_metadata: TableMetadata) -> None:
"""Set the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction."""
self._updates += (
Expand Down Expand Up @@ -749,17 +751,23 @@ class TableIdentifier(IcebergBaseModel):


class CommitTableRequest(IcebergBaseModel):
"""A pydantic BaseModel for a table commit request."""

identifier: TableIdentifier = Field()
requirements: Tuple[TableRequirement, ...] = Field(default_factory=tuple)
updates: Tuple[TableUpdate, ...] = Field(default_factory=tuple)


class CommitTableResponse(IcebergBaseModel):
"""A pydantic BaseModel for a table commit response."""

metadata: TableMetadata
metadata_location: str = Field(alias="metadata-location")


class Table:
"""An Iceberg table."""

_identifier: Identifier = Field()
metadata: TableMetadata
metadata_location: str = Field()
Expand All @@ -785,11 +793,19 @@ def transaction(self) -> Transaction:

@property
def inspect(self) -> InspectTable:
"""Return the InspectTable object to browse the table metadata."""
"""Return the InspectTable object to browse the table metadata.
Returns:
InspectTable object based on this Table.
"""
return InspectTable(self)

def refresh(self) -> Table:
"""Refresh the current table metadata."""
"""Refresh the current table metadata.
Returns:
An updated instance of the same Iceberg table
"""
fresh = self.catalog.load_table(self._identifier)
self.metadata = fresh.metadata
self.io = fresh.io
Expand All @@ -798,7 +814,11 @@ def refresh(self) -> Table:

@property
def identifier(self) -> Identifier:
"""Return the identifier of this table."""
"""Return the identifier of this table.
Returns:
An Identifier tuple of the table name
"""
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
Expand All @@ -807,7 +827,11 @@ def identifier(self) -> Identifier:
return (self.catalog.name,) + self._identifier

def name(self) -> Identifier:
"""Return the identifier of this table."""
"""Return the identifier of this table.
Returns:
An Identifier tuple of the table name
"""
return self.identifier

def scan(
Expand All @@ -819,6 +843,35 @@ def scan(
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> DataScan:
"""Fetch a DataScan based on the table's current metadata.
The data scan can be used to project the table's data
that matches the provided row_filter onto the table's
current schema.
Args:
row_filter:
A string or BooleanExpression that decsribes the
desired rows
selected_fileds:
A tuple of strings representing the column names
to return in the output dataframe.
case_sensitive:
If True column matching is case sensitive
snapshot_id:
Optional Snapshot ID to time travel to. If None,
scans the table as of the current snapshot ID.
options:
Additional Table properties as a dictionary of
string key value pairs to use for this scan.
limit:
An integer representing the number of rows to
return in the scan result. If None, fetches all
matching rows.
Returns:
A DataScan based on the table's current metadata.
"""
return DataScan(
table_metadata=self.metadata,
io=self.io,
Expand Down Expand Up @@ -1212,6 +1265,8 @@ class ScanTask(ABC):

@dataclass(init=False)
class FileScanTask(ScanTask):
"""Task representing a data file and its corresponding delete files."""

file: DataFile
delete_files: Set[DataFile]
start: int
Expand All @@ -1236,6 +1291,11 @@ def _open_manifest(
partition_filter: Callable[[DataFile], bool],
metrics_evaluator: Callable[[DataFile], bool],
) -> List[ManifestEntry]:
"""Open a manifest file and return matching manifest entries.
Returns:
A list of ManifestEntry that matches the provided filters.
"""
return [
manifest_entry
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
Expand Down Expand Up @@ -1395,13 +1455,30 @@ def plan_files(self) -> Iterable[FileScanTask]:
]

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.
All rows will be loaded into memory at once.
Returns:
pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
"""
from pyiceberg.io.pyarrow import ArrowScan

return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.
For large results, using a RecordBatchReader requires less memory than
loading an Arrow Table for the same DataScan, because a RecordBatch
is read one at a time.
Returns:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
which can be used to read a stream of record batches one by one.
"""
import pyarrow as pa

from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
Expand All @@ -1417,9 +1494,19 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
)

def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
"""Read a Pandas DataFrame eagerly from this Iceberg table.
Returns:
pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
"""
return self.to_arrow().to_pandas(**kwargs)

def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
"""Shorthand for loading the Iceberg Table in DuckDB.
Returns:
DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
"""
import duckdb

con = connection or duckdb.connect(database=":memory:")
Expand All @@ -1428,13 +1515,20 @@ def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] =
return con

def to_ray(self) -> ray.data.dataset.Dataset:
"""Read a Ray Dataset eagerly from this Iceberg table.
Returns:
ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
"""
import ray

return ray.data.from_arrow(self.to_arrow())


@dataclass(frozen=True)
class WriteTask:
"""Task with the parameters for writing a DataFile."""

write_uuid: uuid.UUID
task_id: int
schema: Schema
Expand All @@ -1457,6 +1551,8 @@ def generate_data_file_path(self, extension: str) -> str:

@dataclass(frozen=True)
class AddFileTask:
"""Task with the parameters for adding a Parquet file as a DataFile."""

file_path: str
partition_field_value: Record

Expand Down

0 comments on commit 1d9570d

Please sign in to comment.