Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ALDS support for PyArrow #1532

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
)
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
ADLS_ACCOUNT_NAME,
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
Expand Down Expand Up @@ -366,6 +368,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
elif scheme in {"file"}:
return self._initialize_local_fs()

elif scheme in {"abfs", "abfss"}:
return self._initialize_adls_fs()

else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

Expand Down Expand Up @@ -476,6 +481,14 @@ def _initialize_gcs_fs(self) -> FileSystem:

return GcsFileSystem(**gcs_kwargs)

def _initialize_adls_fs(self) -> FileSystem:
from pyarrow.fs import AzureFileSystem

return AzureFileSystem(
account_name=self.properties.get(ADLS_ACCOUNT_NAME),
account_key=self.properties.get(ADLS_ACCOUNT_KEY),
)

def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()

Expand Down
57 changes: 46 additions & 11 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
Generator,
List,
Optional,
Tuple,
)

import boto3
Expand All @@ -53,10 +54,13 @@
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import BoundReference
from pyiceberg.io import (
ADLS_ACCOUNT_NAME,
ADLS_CONNECTION_STRING,
GCS_PROJECT_ID,
GCS_SERVICE_HOST,
GCS_TOKEN,
GCS_TOKEN_EXPIRES_AT_MS,
FileIO,
fsspec,
load_file_io,
)
Expand Down Expand Up @@ -90,6 +94,7 @@

if TYPE_CHECKING:
import pyarrow as pa
from azure.storage.blob import BlobServiceClient
from moto.server import ThreadedMotoServer # type: ignore
from pyspark.sql import SparkSession

Expand Down Expand Up @@ -2077,24 +2082,54 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No
yield boto3.client("dynamodb", region_name="us-east-1")


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

def _get_account_name_and_connection_string(request: pytest.FixtureRequest) -> Tuple[str, str]:
azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

return azurite_account_name, azurite_connection_string


def _setup_blob(azurite_connection_string: str) -> "BlobServiceClient":
from azure.storage.blob import BlobServiceClient

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)

# Recreate container if needed
if list(bbs.list_containers(name_starts_with="tests")):
bbs.delete_container("tests")
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()

return bbs


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
azurite_account_name, azurite_connection_string = _get_account_name_and_connection_string(request)

with _setup_blob(azurite_connection_string):
yield FsspecFileIO(
properties={
ADLS_CONNECTION_STRING: azurite_connection_string,
ADLS_ACCOUNT_NAME: azurite_account_name,
}
)


@pytest.fixture
def adls_pyarrow_fileio(request: pytest.FixtureRequest) -> Generator[FileIO, None, None]:
from pyiceberg.io.pyarrow import PyArrowFileIO

azurite_account_name, azurite_connection_string = _get_account_name_and_connection_string(request)

with _setup_blob(azurite_connection_string):
yield PyArrowFileIO(
properties={
ADLS_CONNECTION_STRING: azurite_connection_string,
ADLS_ACCOUNT_NAME: azurite_account_name,
}
)


@pytest.fixture(scope="session")
Expand Down
Loading
Loading