Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug?] cannot run integration test #1162

Closed
kevinjqliu opened this issue Sep 11, 2024 · 18 comments · Fixed by #1187
Closed

[bug?] cannot run integration test #1162

kevinjqliu opened this issue Sep 11, 2024 · 18 comments · Fixed by #1187

Comments

@kevinjqliu
Copy link
Contributor

Apache Iceberg version

main (development)

Please describe the bug 🐞

On Mac, anyone having issue running make test-integration on the current main branch?

I'm having issues but not sure if its due to my local env

@kevinjqliu kevinjqliu changed the title [bug?] [bug?] cannot run integration test Sep 11, 2024
@soumya-ghosh
Copy link
Contributor

soumya-ghosh commented Sep 11, 2024

Been facing the same issue on my Mac (Python 3.11) since yesterday when I rebased latest main branch to my dev branch. Just realized that tests are not running on main branch itself.
I did make clean and make test-integration-rebuild and now I'm seeing more errors than before.
Although I did run some of those integration tests through IDE and are running fine.

Error message - OSError: When initiating multiple part upload for key 'default/arrow_table_v1_with_multiple_partitions/metadata/snap-6162354366671903431-0-18a934c0-df15-4f4f-acec-3aedfe585765.avro' in bucket 'warehouse': AWS Error NETWORK_CONNECTION during CreateMultipartUpload operation: curlCode: 43, A libcurl function was given a bad argument

@kevinjqliu
Copy link
Contributor Author

kevinjqliu commented Sep 11, 2024

Thanks for confirming. I see the same issue https://gist.github.com/kevinjqliu/c8310b6253beab52cce93391df03bfe4

And only for commits at and after 1971fcfe0875eeb200dbcb66f385e504cfad6609
https://github.com/apache/iceberg-python/commits/main/

The commit before (b5933756b5b488ec51cd56d5984731b6cc347f2b) does not have this issue

The CI integration tests are fine. And @sungwy confirmed that running the integration tests via codespace also works

@kevinjqliu
Copy link
Contributor Author

Full pytest report here: https://gist.github.com/kevinjqliu/a0e8e2199bd8064757eb2b40409e0794

Here's the breakdown of the errors:

171 pyarrow/error.pxi:92: OSError
* 87 pyiceberg/io/pyarrow.py:2426: in write_parquet
* 83 with write_manifest_list(

write_parquet stack trace:

pyiceberg/io/pyarrow.py:2426: in write_parquet
    with fo.create(overwrite=True) as fos:
pyiceberg/io/pyarrow.py:311: in create
    output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
pyarrow/_fs.pyx:887: in pyarrow._fs.FileSystem.open_output_stream
    ???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
    ???

write_manifest_list stack trace

pyiceberg/table/update/snapshot.py:253: in _commit
    with write_manifest_list(
pyiceberg/manifest.py:924: in __enter__
    self._writer.__enter__()
pyiceberg/avro/file.py:258: in __enter__
    self.output_stream = self.output_file.create(overwrite=True)
pyiceberg/io/pyarrow.py:311: in create
    output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
pyarrow/_fs.pyx:887: in pyarrow._fs.FileSystem.open_output_stream
    ???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
    ???

@kevinjqliu
Copy link
Contributor Author

One realization is that the manifest cache is implemented as a global cache.

@lru_cache
def _manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
"""Return the manifests from the manifest list."""
file = io.new_input(manifest_list)
return list(read_manifest_list(file))

And kept in memory until eviction.

And since ManifestFile has an IO component, I think the file descriptors are kept alive and never cleaned up.

Why does this only affect M1 Mac? I have no idea.

@kevinjqliu
Copy link
Contributor Author

#1185 reimplements the manifest cache as specific to the Snapshot instance. So the cache can be cleaned up once the Snapshot is GC'ed.

This also fits the feature request in #595, to be based on a specific Snapshot

@sungwy
Copy link
Collaborator

sungwy commented Sep 19, 2024

And since ManifestFile has an IO component, I think the file descriptors are kept alive and never cleaned up.

Hmmm I don't think Manifest file has an IO component. It's only used as an input parameter to one of the class methods:

class ManifestFile(Record):
__slots__ = (
"manifest_path",
"manifest_length",
"partition_spec_id",
"content",
"sequence_number",
"min_sequence_number",
"added_snapshot_id",
"added_files_count",
"existing_files_count",
"deleted_files_count",
"added_rows_count",
"existing_rows_count",
"deleted_rows_count",
"partitions",
"key_metadata",
)
manifest_path: str
manifest_length: int
partition_spec_id: int
content: ManifestContent
sequence_number: int
min_sequence_number: int
added_snapshot_id: int
added_files_count: Optional[int]
existing_files_count: Optional[int]
deleted_files_count: Optional[int]
added_rows_count: Optional[int]
existing_rows_count: Optional[int]
deleted_rows_count: Optional[int]
partitions: Optional[List[PartitionFieldSummary]]
key_metadata: Optional[bytes]
def __init__(self, *data: Any, **named_data: Any) -> None:
super().__init__(*data, **{"struct": MANIFEST_LIST_FILE_STRUCTS[DEFAULT_READ_VERSION], **named_data})
def has_added_files(self) -> bool:
return self.added_files_count is None or self.added_files_count > 0
def has_existing_files(self) -> bool:
return self.existing_files_count is None or self.existing_files_count > 0
def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]:
"""
Read the manifest entries from the manifest file.
Args:
io: The FileIO to fetch the file.
discard_deleted: Filter on live entries.
Returns:
An Iterator of manifest entries.
"""
input_file = io.new_input(self.manifest_path)
with AvroFile[ManifestEntry](
input_file,
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestEntry, 2: DataFile},
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
) as reader:
return [
_inherit_from_manifest(entry, self)
for entry in reader
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]

@kevinjqliu
Copy link
Contributor Author

I should be more specific. Reading (and caching) a ManifestFile object (or a list of them) ultimately keeps a bunch of PyArrowFile objects alive (anytime io.new_input is called).

In PyArrowFile, the open and create methods do not automatically clean up resources

def open(self, seekable: bool = True) -> InputStream:
"""Open the location using a PyArrow FileSystem inferred from the location.
Args:
seekable: If the stream should support seek, or if it is consumed sequential.
Returns:
pyarrow.lib.NativeFile: A NativeFile instance for the file located at `self.location`.
Raises:
FileNotFoundError: If the file at self.location does not exist.
PermissionError: If the file at self.location cannot be accessed due to a permission error such as
an AWS error code 15.
"""
try:
if seekable:
input_file = self._filesystem.open_input_file(self._path)
else:
input_file = self._filesystem.open_input_stream(self._path, buffer_size=self._buffer_size)
except FileNotFoundError:
raise
except PermissionError:
raise
except OSError as e:
if e.errno == 2 or "Path does not exist" in str(e):
raise FileNotFoundError(f"Cannot open file, does not exist: {self.location}") from e
elif e.errno == 13 or "AWS Error [code 15]" in str(e):
raise PermissionError(f"Cannot open file, access denied: {self.location}") from e
raise # pragma: no cover - If some other kind of OSError, raise the raw error
return input_file
def create(self, overwrite: bool = False) -> OutputStream:
"""Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.
Args:
overwrite (bool): Whether to overwrite the file if it already exists.
Returns:
pyarrow.lib.NativeFile: A NativeFile instance for the file located at self.location.
Raises:
FileExistsError: If the file already exists at `self.location` and `overwrite` is False.
Note:
This retrieves a pyarrow NativeFile by opening an output stream. If overwrite is set to False,
a check is first performed to verify that the file does not exist. This is not thread-safe and
a possibility does exist that the file can be created by a concurrent process after the existence
check yet before the output stream is created. In such a case, the default pyarrow behavior will
truncate the contents of the existing file when opening the output stream.
"""
try:
if not overwrite and self.exists() is True:
raise FileExistsError(f"Cannot create file, already exists: {self.location}")
output_file = self._filesystem.open_output_stream(self._path, buffer_size=self._buffer_size)
except PermissionError:
raise
except OSError as e:
if e.errno == 13 or "AWS Error [code 15]" in str(e):
raise PermissionError(f"Cannot create file, access denied: {self.location}") from e
raise # pragma: no cover - If some other kind of OSError, raise the raw error
return output_file

I think this was part of the issue since I had to increase the system file descriptors limit.

@sungwy
Copy link
Collaborator

sungwy commented Sep 19, 2024

That's a very interesting observation 👀 I'm so curious to understand what's happening. If that's in fact what's happening, I think your proposed solution sounds promising...

I'm still trying to understand this issue thoroughly

In the lru cached _manifests we are caching a list of manifest files, but in fact we are only reading a single manifest list, and deserializing each of the records as ManifestFile Records. So the only PyArrowFile that I'd expect to be left 'open' according to this theory would be the manifest_list

def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
"""
Read the manifests from the manifest list.
Args:
input_file: The input file where the stream can be read from.
Returns:
An iterator of ManifestFiles that are part of the list.
"""
with AvroFile[ManifestFile](
input_file,
MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestFile, 508: PartitionFieldSummary},
read_enums={517: ManifestContent},
) as reader:
yield from reader

@kevinjqliu
Copy link
Contributor Author

I took a step back and realized the fundamental issue was the newly introduced cache.

Without the cache, everything works fine.
With the cache, things break.

Going a layer deeper, this probably means the bug is only for cache hits, as cache misses will just recompute.
So the failure scenario is when the cache hits, but the return value is wrong.

Fundamentally, there are a couple issues with the function definition

@lru_cache
def _manifests(io: FileIO, manifest_list: str) -> List[ManifestFile]:
    """Return the manifests from the manifest list."""
    file = io.new_input(manifest_list)
    return list(read_manifest_list(file))

First, the cache key is both io and manifest_list, whereas we just want the key to be manifest_list
Second, the result is a list, which can be mutated leading to the wrong result.

Here’s an example to showcase the different cache keys

cache = {}

def _manifests(io: FileIO, manifest_list: str, snapshot: Snapshot) -> List[ManifestFile]:
    """Return the manifests from the manifest list."""
    # key = (manifest_list, )  # works
    # key = (manifest_list, io)  # fails
    key = (manifest_list, snapshot)  # works
    if key in cache:
        return cache[key]
    cache[key] = list(read_manifest_list(io.new_input(manifest_list)))
    return cache[key]

Without digging into where it is breaking or why only for M1 Macs, there are 2 potential solutions:

  1. Move the manifest cache to the Snapshot instance
  2. Use the cachetools library to specify manifest_list as the only cache key (see stack overflow)

@kevinjqliu
Copy link
Contributor Author

#1185 uses Snapshot instance specific cache
#1187 uses global cache via the cachetools library to specific the cache key

I recommend #1187

@antonioalegria
Copy link

Hi, i'm having the same issue. Is there a workaround I can use as a client? Can you do a new point release with the fix? Thanks!

@kevinjqliu
Copy link
Contributor Author

kevinjqliu commented Oct 2, 2024

@antonioalegria This is currently fixed in the main branch, which we use to run integration tests.
How are you hitting this issue? We can do a hotfix release if needed

I see your comment here are well #1187 (comment)

@antonioalegria
Copy link

I'm hitting this exact same issue as described in this comment (#1162 (comment)), when running polars on an iceberg table I have in S3.

@kevinjqliu
Copy link
Contributor Author

@antonioalegria thanks for the context. Does the current main branch resolve this issue? It already incorporated the fix

I'll see if we can do a hotfix release

@antonioalegria
Copy link

Thanks!

@sungwy
Copy link
Collaborator

sungwy commented Oct 11, 2024

Hi folks, I'm going through a career transition, so my apologies for arriving on this discussion a little late.

@antonioalegria @kevinjqliu I don't think the bugged Manifest caching feature was ever released. Here's the version of manifest implementation that was released in 0.7.1:

class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None)
sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER)
timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000))
manifest_list: Optional[str] = Field(
alias="manifest-list", description="Location of the snapshot's manifest list file", default=None
)
summary: Optional[Summary] = Field(default=None)
schema_id: Optional[int] = Field(alias="schema-id", default=None)
def __str__(self) -> str:
"""Return the string representation of the Snapshot class."""
operation = f"{self.summary.operation}: " if self.summary else ""
parent_id = f", parent_id={self.parent_snapshot_id}" if self.parent_snapshot_id else ""
schema_id = f", schema_id={self.schema_id}" if self.schema_id is not None else ""
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
return result_str
def manifests(self, io: FileIO) -> List[ManifestFile]:
if self.manifest_list is not None:
file = io.new_input(self.manifest_list)
return list(read_manifest_list(file))
return []

Are we sure we are saying the same exact error? Would you mind sharing the exception trace @antonioalegria ?

@antonioalegria
Copy link

Well, maybe the bug is different but I'm getting it when doing a load of a metadata file when doing scan_iceberg in Polars.

It only happens with certain objects:

When reading information for key '' in bucket '': AWS Error NETWORK_CONNECTION during HeadObject operation: curlCode: 6, Couldn't resolve host name
Traceback (most recent call last):

File "", line 351, in scan_table
return pl.scan_iceberg(location)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/polars/io/iceberg.py", line 143, in scan_iceberg
source = StaticTable.from_metadata(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/pyiceberg/table/init.py", line 1693, in from_metadata
metadata = FromInputFile.table_metadata(file)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/pyiceberg/serializers.py", line 113, in table_metadata
with input_file.open() as input_stream:
^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.12/site-packages/pyiceberg/io/pyarrow.py", line 270, in open
input_file = self._filesystem.open_input_file(self._path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "pyarrow/_fs.pyx", line 789, in pyarrow._fs.FileSystem.open_input_file
File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
OSError: When reading information for key '' in bucket '': AWS Error NETWORK_CONNECTION during HeadObject operation: curlCode: 6, Couldn't resolve host name

See related issue: apache/arrow#40539

@kevinjqliu
Copy link
Contributor Author

@antonioalegria i think thats a different issue

Couldn't resolve host name

versus the origin issue

AWS Error NETWORK_CONNECTION during CreateMultipartUpload operation: curlCode: 43, A libcurl function was given a bad argument

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants