diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d288e4f2f1..1a63c75bc5 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -83,6 +83,8 @@ ) from pyiceberg.expressions.visitors import visit as boolean_expression_visit from pyiceberg.io import ( + ADLS_ACCOUNT_KEY, + ADLS_ACCOUNT_NAME, AWS_ACCESS_KEY_ID, AWS_REGION, AWS_ROLE_ARN, @@ -366,6 +368,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste elif scheme in {"file"}: return self._initialize_local_fs() + elif scheme in {"abfs", "abfss"}: + return self._initialize_adls_fs() + else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") @@ -476,6 +481,14 @@ def _initialize_gcs_fs(self) -> FileSystem: return GcsFileSystem(**gcs_kwargs) + def _initialize_adls_fs(self) -> FileSystem: + from pyarrow.fs import AzureFileSystem + + return AzureFileSystem( + account_name=self.properties.get(ADLS_ACCOUNT_NAME), + account_key=self.properties.get(ADLS_ACCOUNT_KEY), + ) + def _initialize_local_fs(self) -> FileSystem: return PyArrowLocalFileSystem() diff --git a/tests/conftest.py b/tests/conftest.py index c8dde01563..b07ebfc148 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -42,6 +42,7 @@ Generator, List, Optional, + Tuple, ) import boto3 @@ -53,10 +54,13 @@ from pyiceberg.catalog.noop import NoopCatalog from pyiceberg.expressions import BoundReference from pyiceberg.io import ( + ADLS_ACCOUNT_NAME, + ADLS_CONNECTION_STRING, GCS_PROJECT_ID, GCS_SERVICE_HOST, GCS_TOKEN, GCS_TOKEN_EXPIRES_AT_MS, + FileIO, fsspec, load_file_io, ) @@ -90,6 +94,7 @@ if TYPE_CHECKING: import pyarrow as pa + from azure.storage.blob import BlobServiceClient from moto.server import ThreadedMotoServer # type: ignore from pyspark.sql import SparkSession @@ -2077,24 +2082,54 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No yield boto3.client("dynamodb", region_name="us-east-1") -@pytest.fixture -def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]: - from azure.storage.blob import BlobServiceClient - +def _get_account_name_and_connection_string(request: pytest.FixtureRequest) -> Tuple[str, str]: azurite_url = request.config.getoption("--adls.endpoint") azurite_account_name = request.config.getoption("--adls.account-name") azurite_account_key = request.config.getoption("--adls.account-key") azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};" - properties = { - "adls.connection-string": azurite_connection_string, - "adls.account-name": azurite_account_name, - } + + return azurite_account_name, azurite_connection_string + + +def _setup_blob(azurite_connection_string: str) -> "BlobServiceClient": + from azure.storage.blob import BlobServiceClient bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string) + + # Recreate container if needed + if list(bbs.list_containers(name_starts_with="tests")): + bbs.delete_container("tests") bbs.create_container("tests") - yield fsspec.FsspecFileIO(properties=properties) - bbs.delete_container("tests") - bbs.close() + + return bbs + + +@pytest.fixture +def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]: + azurite_account_name, azurite_connection_string = _get_account_name_and_connection_string(request) + + with _setup_blob(azurite_connection_string): + yield FsspecFileIO( + properties={ + ADLS_CONNECTION_STRING: azurite_connection_string, + ADLS_ACCOUNT_NAME: azurite_account_name, + } + ) + + +@pytest.fixture +def adls_pyarrow_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]: + from pyiceberg.io.pyarrow import PyArrowFileIO + + azurite_account_name, azurite_connection_string = _get_account_name_and_connection_string(request) + + with _setup_blob(azurite_connection_string): + yield PyArrowFileIO( + properties={ + ADLS_CONNECTION_STRING: azurite_connection_string, + ADLS_ACCOUNT_NAME: azurite_account_name, + } + ) @pytest.fixture(scope="session") diff --git a/tests/io/test_adls.py b/tests/io/test_adls.py new file mode 100644 index 0000000000..e8364241f0 --- /dev/null +++ b/tests/io/test_adls.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pickle +import uuid + +import pytest +from pytest_lazyfixture import lazy_fixture + +from pyiceberg.io import FileIO +from pyiceberg.io.fsspec import FsspecFileIO +from pyiceberg.io.pyarrow import PyArrowFileIO + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_new_input_file_adls(fileio: FileIO) -> None: + """Test creating a new input file from a file-io""" + filename = str(uuid.uuid4()) + + input_file = fileio.new_input(f"abfss://tests/{filename}") + assert input_file.location == f"abfss://tests/{filename}" + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_new_abfss_output_file_adls(fileio: FsspecFileIO) -> None: + """Test creating a new output file from a file-io""" + filename = str(uuid.uuid4()) + + output_file = fileio.new_output(f"abfss://tests/{filename}") + assert output_file.location == f"abfss://tests/{filename}" + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_write_and_read_file_adls(fileio: FileIO) -> None: + """Test writing and reading a file using FsspecInputFile and FsspecOutputFile""" + filename = str(uuid.uuid4()) + output_file = fileio.new_output(location=f"abfss://tests/{filename}") + with output_file.create() as f: + f.write(b"foo") + + input_file = fileio.new_input(f"abfss://tests/{filename}") + assert input_file.open().read() == b"foo" + + fileio.delete(input_file) + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_getting_length_of_file_adls(fileio: FileIO) -> None: + """Test getting the length of aInputFile and FsspecOutputFile""" + filename = str(uuid.uuid4()) + + output_file = fileio.new_output(location=f"abfss://tests/{filename}") + with output_file.create() as f: + f.write(b"foobar") + + assert len(output_file) == 6 + + input_file = fileio.new_input(location=f"abfss://tests/{filename}") + assert len(input_file) == 6 + + fileio.delete(output_file) + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_file_tell_adls(fileio: FileIO) -> None: + """Test finding cursor position for a file-io file""" + + filename = str(uuid.uuid4()) + + output_file = fileio.new_output(location=f"abfss://tests/{filename}") + with output_file.create() as write_file: + write_file.write(b"foobar") + + input_file = fileio.new_input(location=f"abfss://tests/{filename}") + f = input_file.open() + + f.seek(0) + assert f.tell() == 0 + f.seek(1) + assert f.tell() == 1 + f.seek(3) + assert f.tell() == 3 + f.seek(0) + assert f.tell() == 0 + + fileio.delete(f"abfss://tests/{filename}") + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_read_specified_bytes_for_file_adls(fileio: FileIO) -> None: + """Test reading a specified number of bytes from a file-io file""" + + filename = str(uuid.uuid4()) + output_file = fileio.new_output(location=f"abfss://tests/{filename}") + with output_file.create() as write_file: + write_file.write(b"foo") + + input_file = fileio.new_input(location=f"abfss://tests/{filename}") + f = input_file.open() + + f.seek(0) + assert b"f" == f.read(1) + f.seek(0) + assert b"fo" == f.read(2) + f.seek(1) + assert b"o" == f.read(1) + f.seek(1) + assert b"oo" == f.read(2) + f.seek(0) + assert b"foo" == f.read(999) # test reading amount larger than entire content length + + fileio.delete(input_file) + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_raise_on_opening_file_not_found_adls(fileio: FileIO) -> None: + """Test that a input file raises appropriately when the adls file is not found""" + + filename = str(uuid.uuid4()) + input_file = fileio.new_input(location=f"abfss://tests/{filename}") + with pytest.raises(FileNotFoundError) as exc_info: + input_file.open().read() + + assert filename in str(exc_info.value) + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_checking_if_a_file_exists_adls(fileio: FileIO) -> None: + """Test checking if a file exists""" + + non_existent_file = fileio.new_input(location="abfss://tests/does-not-exist.txt") + assert not non_existent_file.exists() + + filename = str(uuid.uuid4()) + output_file = fileio.new_output(location=f"abfss://tests/{filename}") + assert not output_file.exists() + with output_file.create() as f: + f.write(b"foo") + + existing_input_file = fileio.new_input(location=f"abfss://tests/{filename}") + assert existing_input_file.exists() + + existing_output_file = fileio.new_output(location=f"abfss://tests/{filename}") + assert existing_output_file.exists() + + fileio.delete(existing_output_file) + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_closing_a_file_adls(fileio: FileIO) -> None: + """Test closing an output file and input file""" + filename = str(uuid.uuid4()) + output_file = fileio.new_output(location=f"abfss://tests/{filename}") + with output_file.create() as write_file: + write_file.write(b"foo") + assert not write_file.closed # type: ignore + assert write_file.closed # type: ignore + + input_file = fileio.new_input(location=f"abfss://tests/{filename}") + f = input_file.open() + assert not f.closed # type: ignore + f.close() + assert f.closed # type: ignore + + fileio.delete(f"abfss://tests/{filename}") + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_converting_an_outputfile_to_an_inputfile_adls(fileio: FileIO) -> None: + """Test converting an output file to an input file""" + filename = str(uuid.uuid4()) + output_file = fileio.new_output(location=f"abfss://tests/{filename}") + input_file = output_file.to_input_file() + assert input_file.location == output_file.location + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_writing_avro_file_adls(fileio: FileIO, generated_manifest_entry_file: str) -> None: + """Test that bytes match when reading a local avro file and then reading it again""" + filename = str(uuid.uuid4()) + with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f: + b1 = f.read() + with fileio.new_output(location=f"abfss://tests/{filename}").create() as out_f: + out_f.write(b1) + with fileio.new_input(location=f"abfss://tests/{filename}").open() as in_f: + b2 = in_f.read() + assert b1 == b2 # Check that bytes of read from local avro file match bytes written to adls + + fileio.delete(f"abfss://tests/{filename}") + + +@pytest.mark.adls +@pytest.mark.parametrize( + "fileio", + [lazy_fixture("adls_fsspec_fileio"), lazy_fixture("adls_pyarrow_fileio")], +) +def test_pickle_round_trip_adls(fileio: FileIO) -> None: + _test_pickle_round_trip(fileio, "abfss://tests/foo.txt") + + +def _test_pickle_round_trip(fileio: FileIO, location: str) -> None: + serialized_file_io = pickle.dumps(fileio) + deserialized_file_io = pickle.loads(serialized_file_io) + output_file = deserialized_file_io.new_output(location) + with output_file.create() as f: + f.write(b"foo") + + input_file = deserialized_file_io.new_input(location) + with input_file.open() as f: + data = f.read() + assert data == b"foo" + assert len(input_file) == 3 + deserialized_file_io.delete(location) diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py index 64dc68b9ca..8a782f78fe 100644 --- a/tests/io/test_fsspec.py +++ b/tests/io/test_fsspec.py @@ -290,192 +290,6 @@ def test_fsspec_unified_session_properties() -> None: ) -@pytest.mark.adls -def test_fsspec_new_input_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test creating a new input file from an fsspec file-io""" - filename = str(uuid.uuid4()) - - input_file = adls_fsspec_fileio.new_input(f"abfss://tests/{filename}") - - assert isinstance(input_file, fsspec.FsspecInputFile) - assert input_file.location == f"abfss://tests/{filename}" - - -@pytest.mark.adls -def test_fsspec_new_abfss_output_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test creating a new output file from an fsspec file-io""" - filename = str(uuid.uuid4()) - - output_file = adls_fsspec_fileio.new_output(f"abfss://tests/{filename}") - - assert isinstance(output_file, fsspec.FsspecOutputFile) - assert output_file.location == f"abfss://tests/{filename}" - - -@pytest.mark.adls -def test_fsspec_write_and_read_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test writing and reading a file using FsspecInputFile and FsspecOutputFile""" - filename = str(uuid.uuid4()) - output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - with output_file.create() as f: - f.write(b"foo") - - input_file = adls_fsspec_fileio.new_input(f"abfss://tests/{filename}") - assert input_file.open().read() == b"foo" - - adls_fsspec_fileio.delete(input_file) - - -@pytest.mark.adls -def test_fsspec_getting_length_of_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test getting the length of an FsspecInputFile and FsspecOutputFile""" - filename = str(uuid.uuid4()) - - output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - with output_file.create() as f: - f.write(b"foobar") - - assert len(output_file) == 6 - - input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}") - assert len(input_file) == 6 - - adls_fsspec_fileio.delete(output_file) - - -@pytest.mark.adls -def test_fsspec_file_tell_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test finding cursor position for an fsspec file-io file""" - - filename = str(uuid.uuid4()) - - output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - with output_file.create() as write_file: - write_file.write(b"foobar") - - input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}") - f = input_file.open() - - f.seek(0) - assert f.tell() == 0 - f.seek(1) - assert f.tell() == 1 - f.seek(3) - assert f.tell() == 3 - f.seek(0) - assert f.tell() == 0 - - adls_fsspec_fileio.delete(f"abfss://tests/{filename}") - - -@pytest.mark.adls -def test_fsspec_read_specified_bytes_for_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test reading a specified number of bytes from an fsspec file-io file""" - - filename = str(uuid.uuid4()) - output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - with output_file.create() as write_file: - write_file.write(b"foo") - - input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}") - f = input_file.open() - - f.seek(0) - assert b"f" == f.read(1) - f.seek(0) - assert b"fo" == f.read(2) - f.seek(1) - assert b"o" == f.read(1) - f.seek(1) - assert b"oo" == f.read(2) - f.seek(0) - assert b"foo" == f.read(999) # test reading amount larger than entire content length - - adls_fsspec_fileio.delete(input_file) - - -@pytest.mark.adls -def test_fsspec_raise_on_opening_file_not_found_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test that an fsspec input file raises appropriately when the adls file is not found""" - - filename = str(uuid.uuid4()) - input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}") - with pytest.raises(FileNotFoundError) as exc_info: - input_file.open().read() - - assert filename in str(exc_info.value) - - -@pytest.mark.adls -def test_checking_if_a_file_exists_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test checking if a file exists""" - - non_existent_file = adls_fsspec_fileio.new_input(location="abfss://tests/does-not-exist.txt") - assert not non_existent_file.exists() - - filename = str(uuid.uuid4()) - output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - assert not output_file.exists() - with output_file.create() as f: - f.write(b"foo") - - existing_input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}") - assert existing_input_file.exists() - - existing_output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - assert existing_output_file.exists() - - adls_fsspec_fileio.delete(existing_output_file) - - -@pytest.mark.adls -def test_closing_a_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test closing an output file and input file""" - filename = str(uuid.uuid4()) - output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - with output_file.create() as write_file: - write_file.write(b"foo") - assert not write_file.closed # type: ignore - assert write_file.closed # type: ignore - - input_file = adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}") - f = input_file.open() - assert not f.closed # type: ignore - f.close() - assert f.closed # type: ignore - - adls_fsspec_fileio.delete(f"abfss://tests/{filename}") - - -@pytest.mark.adls -def test_fsspec_converting_an_outputfile_to_an_inputfile_adls(adls_fsspec_fileio: FsspecFileIO) -> None: - """Test converting an output file to an input file""" - filename = str(uuid.uuid4()) - output_file = adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}") - input_file = output_file.to_input_file() - assert input_file.location == output_file.location - - -@pytest.mark.adls -def test_writing_avro_file_adls(generated_manifest_entry_file: str, adls_fsspec_fileio: FsspecFileIO) -> None: - """Test that bytes match when reading a local avro file, writing it using fsspec file-io, and then reading it again""" - filename = str(uuid.uuid4()) - with PyArrowFileIO().new_input(location=generated_manifest_entry_file).open() as f: - b1 = f.read() - with adls_fsspec_fileio.new_output(location=f"abfss://tests/{filename}").create() as out_f: - out_f.write(b1) - with adls_fsspec_fileio.new_input(location=f"abfss://tests/{filename}").open() as in_f: - b2 = in_f.read() - assert b1 == b2 # Check that bytes of read from local avro file match bytes written to adls - - adls_fsspec_fileio.delete(f"abfss://tests/{filename}") - - -@pytest.mark.adls -def test_fsspec_pickle_round_trip_aldfs(adls_fsspec_fileio: FsspecFileIO) -> None: - _test_fsspec_pickle_round_trip(adls_fsspec_fileio, "abfss://tests/foo.txt") - - @pytest.mark.gcs def test_fsspec_new_input_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None: """Test creating a new input file from a fsspec file-io"""