diff --git a/fsspec/implementations/dirfs.py b/fsspec/implementations/dirfs.py index c0623b82f..af0c39785 100644 --- a/fsspec/implementations/dirfs.py +++ b/fsspec/implementations/dirfs.py @@ -12,6 +12,40 @@ class DirFileSystem(AsyncFileSystem): protocol = "dir" + # ---------------------------------------------------------------- + # Transaction delegation: use the wrapped FS’s transaction + transaction_type = property(lambda self: self.fs.transaction_type) + + @property + def transaction(self): + """ + Delegate `with fs.transaction:` to the underlying filesystem + so that dir:// writes participate in the base FS’s transaction. + """ + return self.fs.transaction + + def start_transaction(self): + """Start a transaction and propagate to the base filesystem.""" + if hasattr(self.fs, 'start_transaction'): + self.fs.start_transaction() + super().start_transaction() # Base class handles self._intrans + + def end_transaction(self): + """End a transaction and propagate to the base filesystem.""" + if hasattr(self.fs, 'end_transaction'): + self.fs.end_transaction() + super().end_transaction() # Base class handles self._intrans + + def invalidate_cache(self, path=None): + """ + Discard any cached directory information + And delegate to the base filesystem + """ + if hasattr(self.fs, 'invalidate_cache'): + self.fs.invalidate_cache(path) + super().invalidate_cache(path) + # ---------------------------------------------------------------- + def __init__( self, path=None, diff --git a/fsspec/implementations/tests/test_cached.py b/fsspec/implementations/tests/test_cached.py index c9222d5b5..f1d1f8476 100644 --- a/fsspec/implementations/tests/test_cached.py +++ b/fsspec/implementations/tests/test_cached.py @@ -19,7 +19,10 @@ LocalTempFile, WholeFileCacheFileSystem, ) -from fsspec.implementations.local import make_path_posix +from fsspec.implementations.local import ( + LocalFileSystem, + make_path_posix, +) from fsspec.implementations.zip import ZipFileSystem from fsspec.tests.conftest import win @@ -1337,3 +1340,43 @@ def test_filecache_write(tmpdir, m): assert m.cat(fn) == data.encode() assert fs.cat(fn) == data.encode() + + +def test_cachingfs_transaction_missing_propagation(tmpdir): + # Setup temp directories + storage_dir = str(tmpdir / "storage") + cache_dir = str(tmpdir / "cache") + os.mkdir(storage_dir) + os.mkdir(cache_dir) + + # Create a local filesystem and wrap it with CachingFileSystem + base = LocalFileSystem() + cachefs = CachingFileSystem( + fs=base, + cache_storage=cache_dir, + cache_check=0, + same_names=True + ) + + # Test file path + test_file = os.path.join(storage_dir, "cache_transaction_test.txt") + + # Before transaction, both filesystems should have _intrans=False + assert not base._intrans + assert not cachefs._intrans + + # Enter transaction and write file + with cachefs.transaction: + # CachingFileSystem's transaction flag is set + assert cachefs._intrans + + # But the base filesystem's transaction flag is not - this is the bug + assert not base._intrans, "Base filesystem's transaction flag is not propagated" + + # Write to file + with cachefs.open(test_file, "wb") as f: + f.write(b"cached data") + + # Check if file exists on disk - bug: it will exist immediately since + # transaction context was not propagated + assert os.path.exists(test_file), "File exists during transaction - bug confirmed" diff --git a/fsspec/implementations/tests/test_dirfs.py b/fsspec/implementations/tests/test_dirfs.py index f58969406..98c64a44a 100644 --- a/fsspec/implementations/tests/test_dirfs.py +++ b/fsspec/implementations/tests/test_dirfs.py @@ -1,5 +1,7 @@ +import os import pytest +from fsspec.implementations.local import LocalFileSystem from fsspec.asyn import AsyncFileSystem from fsspec.implementations.dirfs import DirFileSystem from fsspec.spec import AbstractFileSystem @@ -615,3 +617,38 @@ def test_from_url(m): assert fs.ls("", False) == ["file"] assert fs.ls("", True)[0]["name"] == "file" assert fs.cat("file") == b"data" + + +def test_dirfs_transaction_propagation(tmpdir): + # Setup + path = str(tmpdir) + base = LocalFileSystem() + fs = DirFileSystem(path=path, fs=base) + + # Test file path + test_file = "transaction_test.txt" + full_path = os.path.join(path, test_file) + + # Before transaction, both filesystems should have _intrans=False + assert not base._intrans + assert not fs._intrans + + # Enter transaction and write file + with fs.transaction: + # After fix, both filesystems should have _intrans=True + assert base._intrans, "Base filesystem transaction flag not set" + assert fs._intrans, "DirFileSystem transaction flag not set" + + # Write to file + with fs.open(test_file, "wb") as f: + f.write(b"hello world") + + # Check if file exists on disk - it should not until transaction commits + assert not os.path.exists(full_path), "File exists during transaction, not deferred to temp file" + + # After transaction commits, file should exist + assert os.path.exists(full_path), "File not created after transaction commit" + + # Verify content + with open(full_path, "rb") as f: + assert f.read() == b"hello world"