Skip to content

Commit 3e288a8

Browse files
committed
Better filesystem selection
1 parent 16dc69e commit 3e288a8

File tree

2 files changed

+119
-22
lines changed
  • src/nested_pandas/nestedframe
  • tests/nested_pandas/nestedframe

2 files changed

+119
-22
lines changed

src/nested_pandas/nestedframe/io.py

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# typing.Self and "|" union syntax don't exist in Python 3.9
22
from __future__ import annotations
33

4-
from collections.abc import Sequence
4+
from pathlib import Path
55

66
import pandas as pd
77
import pyarrow as pa
@@ -33,10 +33,11 @@ def read_parquet(
3333
3434
Parameters
3535
----------
36-
data: str, Upath, or file-like object
36+
data: str, list or str, Path, Upath, or file-like object
3737
Path to the data or a file-like object. If a string is passed, it can be a single file name,
3838
directory name, or a remote path (e.g., HTTP/HTTPS or S3). If a file-like object is passed,
39-
it must support the `read` method.
39+
it must support the `read` method. You can also pass the `filesystem` argument with
40+
a `pyarrow.fs` object, which will be passed to `pyarrow.parquet.read_table()`.
4041
columns : list, default=None
4142
If not None, only these columns will be read from the file.
4243
reject_nesting: list or str, default=None
@@ -93,26 +94,13 @@ def read_parquet(
9394
reject_nesting = [reject_nesting]
9495

9596
# First load through pyarrow
96-
# Check if `data` is a file-like object or a sequence
97-
if hasattr(data, "read") or (
98-
isinstance(data, Sequence) and not isinstance(data, str | bytes | bytearray)
99-
):
100-
# If `data` is a file-like object or a sequence, pass it directly to pyarrow
97+
# If `filesystem` is specified - use it
98+
if kwargs.get("filesystem") is not None:
10199
table = pq.read_table(data, columns=columns, **kwargs)
100+
# Otherwise convert with a special function
102101
else:
103-
# Try creating pyarrow-native filesystem
104-
try:
105-
fs, path = pa.fs.FileSystem.from_uri(data)
106-
except (TypeError, pa.ArrowInvalid):
107-
# Otherwise, treat `data` as an URI for fsspec-supported silesystem and use UPath
108-
upath = UPath(data)
109-
# Use smaller block size for better performance
110-
if upath.protocol in ("http", "https"):
111-
upath = UPath(upath, block_size=FSSPEC_BLOCK_SIZE)
112-
path = upath.path
113-
fs = upath.fs
114-
filesystem = kwargs.pop("filesystem", fs)
115-
table = pq.read_table(path, columns=columns, filesystem=filesystem, **kwargs)
102+
data, filesystem = _transform_read_parquet_data_arg(data)
103+
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
116104

117105
# Resolve partial loading of nested structures
118106
# Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux")
@@ -172,6 +160,56 @@ def read_parquet(
172160
return from_pyarrow(table, reject_nesting=reject_nesting, autocast_list=autocast_list)
173161

174162

163+
def _transform_read_parquet_data_arg(data):
164+
"""Transform `data` argument of read_parquet to pq.read_parquet's `source` and `filesystem`"""
165+
# Check if a list, run the function recursively and check that filesystems are all the same
166+
if isinstance(data, list):
167+
paths = []
168+
first_fs = None
169+
for i, d in enumerate(data):
170+
path, fs = _transform_read_parquet_data_arg(d)
171+
paths.append(path)
172+
if i == 0:
173+
first_fs = fs
174+
elif fs != first_fs:
175+
raise ValueError(
176+
f"All filesystems in the list should be the same, first fs: {first_fs}, {i + 1} fs: {fs}"
177+
)
178+
return paths, first_fs
179+
# Check if a file-like object
180+
if hasattr(data, "read"):
181+
return data, None
182+
# Check if `data` is a Path
183+
# Check if `data` is a UPath and use it
184+
if isinstance(data, UPath):
185+
return data.path, data.fs
186+
if isinstance(data, Path):
187+
return data, None
188+
# It should be a string now
189+
if not isinstance(data, str):
190+
raise TypeError("data must be a file-like object, Path, UPath, list, or str")
191+
192+
# Try creating pyarrow-native filesystem assuming that `data` is a URI
193+
try:
194+
fs, path = pa.fs.FileSystem.from_uri(data)
195+
# If the convertion failed, continue
196+
except (TypeError, pa.ArrowInvalid):
197+
pass
198+
# If not, use pyarrow filesystem
199+
else:
200+
return path, fs
201+
202+
# Otherwise, treat `data` as a URI or a local path
203+
upath = UPath(data)
204+
# If it is a local path, use pyarrow's filesystem
205+
if upath.protocol == "":
206+
return upath.path, None
207+
# If HTTP, change the default UPath object to use a smaller block size
208+
if upath.protocol in ("http", "https"):
209+
upath = UPath(upath, block_size=FSSPEC_BLOCK_SIZE)
210+
return upath.path, upath.fs
211+
212+
175213
def from_pyarrow(
176214
table: pa.Table,
177215
reject_nesting: list[str] | str | None = None,

tests/nested_pandas/nestedframe/test_io.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1+
import io
12
import os
23
import tempfile
4+
from pathlib import Path
35

6+
import fsspec.implementations.http
7+
import fsspec.implementations.local
48
import pandas as pd
59
import pyarrow as pa
10+
import pyarrow.fs
611
import pyarrow.parquet as pq
712
import pytest
813
from nested_pandas import NestedFrame, read_parquet
914
from nested_pandas.datasets import generate_data
10-
from nested_pandas.nestedframe.io import from_pyarrow
15+
from nested_pandas.nestedframe.io import _transform_read_parquet_data_arg, from_pyarrow
1116
from pandas.testing import assert_frame_equal
1217
from upath import UPath
1318

@@ -338,3 +343,57 @@ def test_read_parquet_list_autocast():
338343
assert len(nf["c"].nest.to_flat()) == 9
339344
assert nf["d"].nest.fields == ["d"]
340345
assert len(nf["d"].nest.to_flat()) == 9
346+
347+
348+
def test__transform_read_parquet_data_arg():
349+
"""Testing _transform_read_parquet_data_arg"""
350+
with open("tests/test_data/nested.parquet", "rb") as f:
351+
bytes = f.read()
352+
io_bytes = io.BytesIO(bytes)
353+
assert _transform_read_parquet_data_arg(io_bytes) == (io_bytes, None)
354+
355+
local_path = "tests/test_data/nested.parquet"
356+
with open(local_path, "rb") as f:
357+
assert _transform_read_parquet_data_arg(f) == (f, None)
358+
with open(Path(local_path), "rb") as f:
359+
assert _transform_read_parquet_data_arg(f) == (f, None)
360+
with Path(local_path).open("rb") as f:
361+
assert _transform_read_parquet_data_arg(f) == (f, None)
362+
with UPath(local_path).open("rb") as f:
363+
assert _transform_read_parquet_data_arg(f) == (f, None)
364+
365+
assert _transform_read_parquet_data_arg(local_path) == (local_path, None)
366+
367+
assert _transform_read_parquet_data_arg(Path(local_path)) == (Path(local_path), None)
368+
369+
local_upath = UPath(local_path)
370+
assert _transform_read_parquet_data_arg(local_upath) == (local_path, local_upath.fs)
371+
372+
s3_path = "s3://nasa-irsa-euclid-q1/contributed/q1/merged_objects/hats/euclid_q1_merged_objects-hats/dataset/Norder=3/Dir=0/Npix=334/part0.snappy.parquet"
373+
path, fs = _transform_read_parquet_data_arg(s3_path)
374+
assert f"s3://{path}" == s3_path
375+
assert isinstance(fs, pa.fs.S3FileSystem)
376+
377+
https_path = "https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet"
378+
path, fs = _transform_read_parquet_data_arg(https_path)
379+
assert path == https_path
380+
assert isinstance(fs, fsspec.implementations.http.HTTPFileSystem)
381+
382+
with pytest.raises(TypeError):
383+
_transform_read_parquet_data_arg(123)
384+
385+
local_paths = list(Path("tests/test_data").glob("*.parquet"))
386+
assert _transform_read_parquet_data_arg(local_paths) == (local_paths, None)
387+
388+
local_upaths = list(UPath("tests/test_data").glob("*.parquet"))
389+
paths, fs = _transform_read_parquet_data_arg(local_upaths)
390+
assert paths == [up.path for up in local_upaths]
391+
assert isinstance(fs, fsspec.implementations.local.LocalFileSystem)
392+
393+
with pytest.raises(ValueError):
394+
_transform_read_parquet_data_arg(
395+
[
396+
"tests/test_data",
397+
"https://data.lsdb.io/hats/gaia_dr3/gaia/dataset/Norder=2/Dir=0/Npix=0.parquet",
398+
]
399+
)

0 commit comments

Comments
 (0)