Skip to content

Commit

Permalink
Code tidy-up
Browse files Browse the repository at this point in the history
  • Loading branch information
jiakai-li committed Jan 6, 2025
1 parent bc2adc7 commit 4b83fc0
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@
T = TypeVar("T")


class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile:
# In LocalFileSystem, parent directories must be first created before opening an output stream
self.create_dir(os.path.dirname(path), recursive=True)
return super().open_output_stream(path, *args, **kwargs)


class PyArrowFile(InputFile, OutputFile):
"""A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.
Expand Down Expand Up @@ -346,24 +353,24 @@ def parse_location(location: str) -> Tuple[str, str, str]:
def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
"""Initialize FileSystem for different scheme."""
if scheme in {"oss"}:
return self._initialize_oss_fs(scheme, netloc)
return self._initialize_oss_fs()

elif scheme in {"s3", "s3a", "s3n"}:
return self._initialize_s3_fs(scheme, netloc)
return self._initialize_s3_fs(netloc)

elif scheme in ("hdfs", "viewfs"):
elif scheme in {"hdfs", "viewfs"}:
return self._initialize_hdfs_fs(scheme, netloc)

elif scheme in {"gs", "gcs"}:
return self._initialize_gcs_fs(scheme, netloc)
return self._initialize_gcs_fs()

elif scheme in {"file"}:
return self._initialize_local_fs(scheme, netloc)
return self._initialize_local_fs()

else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

def _initialize_oss_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
def _initialize_oss_fs(self) -> FileSystem:
from pyarrow.fs import S3FileSystem

client_kwargs: Dict[str, Any] = {
Expand Down Expand Up @@ -391,7 +398,7 @@ def _initialize_oss_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:

return S3FileSystem(**client_kwargs)

def _initialize_s3_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
from pyarrow.fs import S3FileSystem, resolve_s3_region

# Resolve region from netloc(bucket), fallback to user-provided region
Expand Down Expand Up @@ -453,7 +460,7 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:

return HadoopFileSystem(**hdfs_kwargs)

def _initialize_gcs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
def _initialize_gcs_fs(self) -> FileSystem:
from pyarrow.fs import GcsFileSystem

gcs_kwargs: Dict[str, Any] = {}
Expand All @@ -476,13 +483,7 @@ def _initialize_gcs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:

return GcsFileSystem(**gcs_kwargs)

def _initialize_local_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile:
# In LocalFileSystem, parent directories must be first created before opening an output stream
self.create_dir(os.path.dirname(path), recursive=True)
return super().open_output_stream(path, *args, **kwargs)

def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()

def new_input(self, location: str) -> PyArrowFile:
Expand Down

0 comments on commit 4b83fc0

Please sign in to comment.