Skip to content

Commit

Permalink
Add ALDS support for PyArrow
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jan 17, 2025
1 parent e8e2c91 commit 62986bc
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 189 deletions.
13 changes: 13 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
)
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
ADLS_ACCOUNT_NAME,
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
Expand Down Expand Up @@ -366,6 +368,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
elif scheme in {"file"}:
return self._initialize_local_fs()

elif scheme in {"abfs", "abfss"}:
return self._initialize_adls_fs()

else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

Expand Down Expand Up @@ -476,6 +481,14 @@ def _initialize_gcs_fs(self) -> FileSystem:

return GcsFileSystem(**gcs_kwargs)

def _initialize_adls_fs(self) -> FileSystem:
from pyarrow.fs import AzureFileSystem

return AzureFileSystem(
account_name=self.properties.get(ADLS_ACCOUNT_NAME),
account_key=self.properties.get(ADLS_ACCOUNT_KEY),
)

def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()

Expand Down
38 changes: 35 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import BoundReference
from pyiceberg.io import (
ADLS_ACCOUNT_NAME,
ADLS_CONNECTION_STRING,
GCS_PROJECT_ID,
GCS_SERVICE_HOST,
GCS_TOKEN,
GCS_TOKEN_EXPIRES_AT_MS,
FileIO,
fsspec,
load_file_io,
pyarrow,
)
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.manifest import DataFile, FileFormat
Expand Down Expand Up @@ -2078,25 +2082,53 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
ADLS_CONNECTION_STRING: azurite_connection_string,
ADLS_ACCOUNT_NAME: azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)

if list(bbs.list_containers(name_starts_with="tests")):
bbs.delete_container("tests")

bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture
def adls_pyarrow_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
ADLS_CONNECTION_STRING: azurite_connection_string,
ADLS_ACCOUNT_NAME: azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)

if list(bbs.list_containers(name_starts_with="tests")):
bbs.delete_container("tests")

bbs.create_container("tests")
yield pyarrow.PyArrowFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture(scope="session")
def empty_home_dir_path(tmp_path_factory: pytest.TempPathFactory) -> str:
home_path = str(tmp_path_factory.mktemp("home"))
Expand Down
270 changes: 270 additions & 0 deletions tests/io/test_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pickle
import uuid

import pytest
from pytest_lazyfixture import lazy_fixture

from pyiceberg.io import FileIO
from pyiceberg.io.fsspec import FsspecFileIO
from pyiceberg.io.pyarrow import PyArrowFileIO


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_new_input_file_adls(fileio: FileIO) -> None:
"""Test creating a new input file from a file-io"""
filename = str(uuid.uuid4())

input_file = fileio.new_input(f"abfss://tests/{filename}")
assert input_file.location == f"abfss://tests/{filename}"


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_new_abfss_output_file_adls(fileio: FsspecFileIO) -> None:
"""Test creating a new output file from a file-io"""
filename = str(uuid.uuid4())

output_file = fileio.new_output(f"abfss://tests/{filename}")
assert output_file.location == f"abfss://tests/{filename}"


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_write_and_read_file_adls(fileio: FileIO) -> None:
"""Test writing and reading a file using FsspecInputFile and FsspecOutputFile"""
filename = str(uuid.uuid4())
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as f:
f.write(b"foo")

input_file = fileio.new_input(f"abfss://tests/{filename}")
assert input_file.open().read() == b"foo"

fileio.delete(input_file)


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_getting_length_of_file_adls(fileio: FileIO) -> None:
"""Test getting the length of aInputFile and FsspecOutputFile"""
filename = str(uuid.uuid4())

output_file = fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as f:
f.write(b"foobar")

assert len(output_file) == 6

input_file = fileio.new_input(location=f"abfss://tests/{filename}")
assert len(input_file) == 6

fileio.delete(output_file)


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_file_tell_adls(fileio: FileIO) -> None:
"""Test finding cursor position for a file-io file"""

filename = str(uuid.uuid4())

output_file = fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as write_file:
write_file.write(b"foobar")

input_file = fileio.new_input(location=f"abfss://tests/{filename}")
f = input_file.open()

f.seek(0)
assert f.tell() == 0
f.seek(1)
assert f.tell() == 1
f.seek(3)
assert f.tell() == 3
f.seek(0)
assert f.tell() == 0

fileio.delete(f"abfss://tests/{filename}")


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_read_specified_bytes_for_file_adls(fileio: FileIO) -> None:
"""Test reading a specified number of bytes from a file-io file"""

filename = str(uuid.uuid4())
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as write_file:
write_file.write(b"foo")

input_file = fileio.new_input(location=f"abfss://tests/{filename}")
f = input_file.open()

f.seek(0)
assert b"f" == f.read(1)
f.seek(0)
assert b"fo" == f.read(2)
f.seek(1)
assert b"o" == f.read(1)
f.seek(1)
assert b"oo" == f.read(2)
f.seek(0)
assert b"foo" == f.read(999) # test reading amount larger than entire content length

fileio.delete(input_file)


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_raise_on_opening_file_not_found_adls(fileio: FileIO) -> None:
"""Test that a input file raises appropriately when the adls file is not found"""

filename = str(uuid.uuid4())
input_file = fileio.new_input(location=f"abfss://tests/{filename}")
with pytest.raises(FileNotFoundError) as exc_info:
input_file.open().read()

assert filename in str(exc_info.value)


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_checking_if_a_file_exists_adls(fileio: FileIO) -> None:
"""Test checking if a file exists"""

non_existent_file = fileio.new_input(location="abfss://tests/does-not-exist.txt")
assert not non_existent_file.exists()

filename = str(uuid.uuid4())
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
assert not output_file.exists()
with output_file.create() as f:
f.write(b"foo")

existing_input_file = fileio.new_input(location=f"abfss://tests/{filename}")
assert existing_input_file.exists()

existing_output_file = fileio.new_output(location=f"abfss://tests/{filename}")
assert existing_output_file.exists()

fileio.delete(existing_output_file)


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_closing_a_file_adls(fileio: FileIO) -> None:
"""Test closing an output file and input file"""
filename = str(uuid.uuid4())
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
with output_file.create() as write_file:
write_file.write(b"foo")
assert not write_file.closed # type: ignore
assert write_file.closed # type: ignore

input_file = fileio.new_input(location=f"abfss://tests/{filename}")
f = input_file.open()
assert not f.closed # type: ignore
f.close()
assert f.closed # type: ignore

fileio.delete(f"abfss://tests/{filename}")


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_converting_an_outputfile_to_an_inputfile_adls(fileio: FileIO) -> None:
"""Test converting an output file to an input file"""
filename = str(uuid.uuid4())
output_file = fileio.new_output(location=f"abfss://tests/{filename}")
input_file = output_file.to_input_file()
assert input_file.location == output_file.location


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_writing_avro_file_adls(fileio: FileIO, generated_manifest_entry_file: str) -> None:
"""Test that bytes match when reading a local avro file and then reading it again"""
filename = str(uuid.uuid4())
with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f:
b1 = f.read()
with fileio.new_output(location=f"abfss://tests/{filename}").create() as out_f:
out_f.write(b1)
with fileio.new_input(location=f"abfss://tests/{filename}").open() as in_f:
b2 = in_f.read()
assert b1 == b2 # Check that bytes of read from local avro file match bytes written to adls

fileio.delete(f"abfss://tests/{filename}")


@pytest.mark.adls
@pytest.mark.parametrize(
"fileio",
[lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")],
)
def test_pickle_round_trip_adls(fileio: FileIO) -> None:
_test_pickle_round_trip(fileio, "abfss://tests/foo.txt")


def _test_pickle_round_trip(fileio: FileIO, location: str) -> None:
serialized_file_io = pickle.dumps(fileio)
deserialized_file_io = pickle.loads(serialized_file_io)
output_file = deserialized_file_io.new_output(location)
with output_file.create() as f:
f.write(b"foo")

input_file = deserialized_file_io.new_input(location)
with input_file.open() as f:
data = f.read()
assert data == b"foo"
assert len(input_file) == 3
deserialized_file_io.delete(location)
Loading

0 comments on commit 62986bc

Please sign in to comment.