Skip to content

Commit

Permalink
Add ALDS support for PyArrow
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jan 17, 2025
1 parent e8e2c91 commit ae16b3d
Show file tree
Hide file tree
Showing 4 changed files with 329 additions and 197 deletions.
13 changes: 13 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
)
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
ADLS_ACCOUNT_NAME,
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
Expand Down Expand Up @@ -366,6 +368,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
elif scheme in {"file"}:
return self._initialize_local_fs()

elif scheme in {"abfs", "abfss"}:
return self._initialize_adls_fs()

else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

Expand Down Expand Up @@ -476,6 +481,14 @@ def _initialize_gcs_fs(self) -> FileSystem:

return GcsFileSystem(**gcs_kwargs)

def _initialize_adls_fs(self) -> FileSystem:
from pyarrow.fs import AzureFileSystem

return AzureFileSystem(
account_name=self.properties.get(ADLS_ACCOUNT_NAME),
account_key=self.properties.get(ADLS_ACCOUNT_KEY),
)

def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()

Expand Down
57 changes: 46 additions & 11 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
Generator,
List,
Optional,
Tuple,
)

import boto3
Expand All @@ -53,10 +54,13 @@
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import BoundReference
from pyiceberg.io import (
ADLS_ACCOUNT_NAME,
ADLS_CONNECTION_STRING,
GCS_PROJECT_ID,
GCS_SERVICE_HOST,
GCS_TOKEN,
GCS_TOKEN_EXPIRES_AT_MS,
FileIO,
fsspec,
load_file_io,
)
Expand Down Expand Up @@ -90,6 +94,7 @@

if TYPE_CHECKING:
import pyarrow as pa
from azure.storage.blob import BlobServiceClient
from moto.server import ThreadedMotoServer # type: ignore
from pyspark.sql import SparkSession

Expand Down Expand Up @@ -2077,24 +2082,54 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No
yield boto3.client("dynamodb", region_name="us-east-1")


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

def _get_account_name_and_connection_string(request: pytest.FixtureRequest) -> Tuple[str, str]:
azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

return azurite_account_name, azurite_connection_string


def _setup_blob(azurite_connection_string: str) -> "BlobServiceClient":
from azure.storage.blob import BlobServiceClient

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)

# Recreate container if needed
if list(bbs.list_containers(name_starts_with="tests")):
bbs.delete_container("tests")
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()

return bbs


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
azurite_account_name, azurite_connection_string = _get_account_name_and_connection_string(request)

with _setup_blob(azurite_connection_string):
yield FsspecFileIO(
properties={
ADLS_CONNECTION_STRING: azurite_connection_string,
ADLS_ACCOUNT_NAME: azurite_account_name,
}
)


@pytest.fixture
def adls_pyarrow_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
from pyiceberg.io.pyarrow import PyArrowFileIO

azurite_account_name, azurite_connection_string = _get_account_name_and_connection_string(request)

with _setup_blob(azurite_connection_string):
yield PyArrowFileIO(
properties={
ADLS_CONNECTION_STRING: azurite_connection_string,
ADLS_ACCOUNT_NAME: azurite_account_name,
}
)


@pytest.fixture(scope="session")
Expand Down
Loading

0 comments on commit ae16b3d

Please sign in to comment.