From 17f58a1013ca4f93463c5d74b32c8c9cfb3cb03e Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Thu, 20 Feb 2025 01:23:52 +0700 Subject: [PATCH 1/7] SQLite-based read-write lock --- src/filelock/_read_write.py | 74 +++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 src/filelock/_read_write.py diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py new file mode 100644 index 00000000..8de272cb --- /dev/null +++ b/src/filelock/_read_write.py @@ -0,0 +1,74 @@ +import os +import sqlite3 +import threading + +from _error import Timeout +from filelock._api import BaseFileLock + +class _SQLiteLock(BaseFileLock): + def __init__(self, lock_file: str | os.PathLike[str], timeout: float = -1, blocking: bool = True): + super().__init__(lock_file, timeout, blocking) + self.procLock = threading.Lock() + self.con = sqlite3.connect(self._context.lock_file, check_same_thread=False) + # Redundant unless there are "rogue" processes that open the db + # and switch the the db to journal_mode=WAL. + # Using the legacy journal mode rather than more modern WAL mode because, + # apparently, in WAL mode it's impossible to enforce that read transactions + # (started with BEGIN TRANSACTION) are blocked if a concurrent write transaction, + # even EXCLUSIVE, is in progress, unless the read transactions actually read + # any pages modified by the write transaction. But in the legacy journal mode, + # it seems, it's possible to do this read-write locking without table data + # modification at each exclusive lock. + # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions + self.con.execute('PRAGMA journal_mode=DELETE;') + self.cur = None + + def _release(self): + with self.procLock: + if self.cur is None: + return # Nothing to release + try: + self.cur.execute('ROLLBACK TRANSACTION;') + except sqlite3.ProgrammingError: + pass # Already rolled back or transaction not active + finally: + self.cur.close() + self.cur = None + +class WriteLock(_SQLiteLock): + def _acquire(self) -> None: + timeout_ms = int(self._context.timeout*1000) if self._context.blocking else 0 + with self.procLock: + if self.cur is not None: + return + self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) + try: + self.cur = self.con.execute('BEGIN EXCLUSIVE TRANSACTION;') + except sqlite3.OperationalError as e: + if 'database is locked' not in str(e): + raise # Re-raise unexpected errors + raise Timeout(self._context.lock_file) + +class ReadLock(_SQLiteLock): + def _acquire(self): + timeout_ms = int(self._context.timeout * 1000) if self._context.blocking else 0 + with self.procLock: + if self.cur is not None: + return + self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) + cur = None # Initialize cur to avoid potential UnboundLocalError + try: + cur = self.con.execute('BEGIN TRANSACTION;') + # BEGIN doesn't itself acquire a SHARED lock on the db, that is needed for + # effective exclusion with writeLock(). A SELECT is needed. + cur.execute('SELECT name from sqlite_schema LIMIT 1;') + self.cur = cur + except sqlite3.OperationalError as e: + if 'database is locked' not in str(e): + raise # Re-raise unexpected errors + if cur is not None: + cur.close() + raise Timeout(self._context.lock_file) + + + From 71f4501c20ae170437037865ae8c3aac637662e5 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Sat, 22 Feb 2025 17:23:35 +0700 Subject: [PATCH 2/7] Refactor, fix bugs --- src/filelock/_read_write.py | 276 ++++++++++++++++++++++++++++-------- 1 file changed, 219 insertions(+), 57 deletions(-) diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py index 8de272cb..4557e61d 100644 --- a/src/filelock/_read_write.py +++ b/src/filelock/_read_write.py @@ -1,17 +1,85 @@ import os import sqlite3 import threading - +import logging from _error import Timeout -from filelock._api import BaseFileLock - -class _SQLiteLock(BaseFileLock): - def __init__(self, lock_file: str | os.PathLike[str], timeout: float = -1, blocking: bool = True): - super().__init__(lock_file, timeout, blocking) - self.procLock = threading.Lock() - self.con = sqlite3.connect(self._context.lock_file, check_same_thread=False) - # Redundant unless there are "rogue" processes that open the db - # and switch the the db to journal_mode=WAL. +from filelock._api import AcquireReturnProxy, BaseFileLock +from typing import Literal, Any +from contextlib import contextmanager +from weakref import WeakValueDictionary + +_LOGGER = logging.getLogger("filelock") + +# PRAGMA busy_timeout=N delegates to https://www.sqlite.org/c3ref/busy_timeout.html, +# which accepts an int argument, which has the maximum value of 2_147_483_647 on 32-bit +# systems. Use even a lower value to be safe. This 2 bln milliseconds is about 23 days. +_MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 + +def timeout_for_sqlite(timeout: float = -1, blocking: bool = True) -> int: + if blocking is False: + return 0 + if timeout == -1: + return _MAX_SQLITE_TIMEOUT_MS + if timeout < 0: + raise ValueError("timeout must be a non-negative number or -1") + + assert timeout >= 0 + timeout_ms = int(timeout * 1000) + if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: + _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) + return _MAX_SQLITE_TIMEOUT_MS + return timeout_ms + + +class _ReadWriteLockMeta(type): + """Metaclass that redirects instance creation to get_lock() when is_singleton=True.""" + def __call__(cls, lock_file: str | os.PathLike[str], + timeout: float = -1, blocking: bool = True, + is_singleton: bool = True, *args: Any, **kwargs: Any) -> "ReadWriteLock": + if is_singleton: + return cls.get_lock(lock_file, timeout, blocking) + return super().__call__(lock_file, timeout, blocking, is_singleton, *args, **kwargs) + + +class ReadWriteLock(metaclass=_ReadWriteLockMeta): + # Singleton storage and its lock. + _instances = WeakValueDictionary() + _instances_lock = threading.Lock() + + @classmethod + def get_lock(cls, lock_file: str | os.PathLike[str], + timeout: float = -1, blocking: bool = True) -> "ReadWriteLock": + """Return the one-and-only ReadWriteLock for a given file.""" + normalized = os.path.abspath(lock_file) + with cls._instances_lock: + if normalized not in cls._instances: + cls._instances[normalized] = cls(lock_file, timeout, blocking) + instance = cls._instances[normalized] + if instance.timeout != timeout or instance.blocking != blocking: + raise ValueError("Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s", instance.timeout, instance.blocking, timeout, blocking) + return instance + + def __init__( + self, + lock_file: str | os.PathLike[str], + timeout: float = -1, + blocking: bool = True, + is_singleton: bool = True, + ) -> None: + self.lock_file = lock_file + self.timeout = timeout + self.blocking = blocking + # _transaction_lock is for the SQLite transaction work. + self._transaction_lock = threading.Lock() + # _internal_lock protects the short critical sections that update _lock_level + # and rollback the transaction in release(). + self._internal_lock = threading.Lock() + self._lock_level = 0 # Reentrance counter. + # _current_mode holds the active lock mode ("read" or "write") or None if no lock is held. + self._current_mode: Literal["read", "write", None] = None + # _lock_level is the reentrance counter. + self._lock_level = 0 + self.con = sqlite3.connect(self.lock_file, check_same_thread=False) # Using the legacy journal mode rather than more modern WAL mode because, # apparently, in WAL mode it's impossible to enforce that read transactions # (started with BEGIN TRANSACTION) are blocked if a concurrent write transaction, @@ -20,55 +88,149 @@ def __init__(self, lock_file: str | os.PathLike[str], timeout: float = -1, block # it seems, it's possible to do this read-write locking without table data # modification at each exclusive lock. # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions - self.con.execute('PRAGMA journal_mode=DELETE;') - self.cur = None - - def _release(self): - with self.procLock: - if self.cur is None: - return # Nothing to release - try: - self.cur.execute('ROLLBACK TRANSACTION;') - except sqlite3.ProgrammingError: - pass # Already rolled back or transaction not active - finally: - self.cur.close() - self.cur = None - -class WriteLock(_SQLiteLock): - def _acquire(self) -> None: - timeout_ms = int(self._context.timeout*1000) if self._context.blocking else 0 - with self.procLock: - if self.cur is not None: - return + # "MEMORY" journal mode is fine because no actual writes to the are happening in write-lock + # acquire, so crashes cannot adversely affect the DB. Even journal_mode=OFF would probably + # be fine, too, but the SQLite documentation says that ROLLBACK becomes *undefined behaviour* + # with journal_mode=OFF which sounds scarier. + self.con.execute('PRAGMA journal_mode=MEMORY;') + + def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: + """Acquire a read lock. If a lock is already held, it must be a read lock. + Upgrading from read to write is prohibited.""" + with self._internal_lock: + if self._lock_level > 0: + # Must already be in read mode. + if self._current_mode != "read": + raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)") + self._lock_level += 1 + return AcquireReturnProxy(lock=self) + + timeout_ms = timeout_for_sqlite(timeout, blocking) + + # Acquire the transaction lock so that the (possibly blocking) SQLite work + # happens without conflicting with other threads' transaction work. + if not self._transaction_lock.acquire(blocking, timeout): + raise Timeout(self.lock_file) + try: + # Double-check: another thread might have completed acquisition meanwhile. + with self._internal_lock: + if self._lock_level > 0: + # Must already be in read mode. + if self._current_mode != "read": + raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)") + self._lock_level += 1 + return AcquireReturnProxy(lock=self) + self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) - try: - self.cur = self.con.execute('BEGIN EXCLUSIVE TRANSACTION;') - except sqlite3.OperationalError as e: - if 'database is locked' not in str(e): - raise # Re-raise unexpected errors - raise Timeout(self._context.lock_file) - -class ReadLock(_SQLiteLock): - def _acquire(self): - timeout_ms = int(self._context.timeout * 1000) if self._context.blocking else 0 - with self.procLock: - if self.cur is not None: - return + self.con.execute('BEGIN TRANSACTION;') + # Need to make SELECT to compel SQLite to actually acquire a SHARED db lock. + # See https://www.sqlite.org/lockingv3.html#transaction_control + self.con.execute('SELECT name from sqlite_schema LIMIT 1;') + + with self._internal_lock: + self._current_mode = "read" + self._lock_level = 1 + + return AcquireReturnProxy(lock=self) + + except sqlite3.OperationalError as e: + if 'database is locked' not in str(e): + raise # Re-raise unexpected errors. + raise Timeout(self.lock_file) + finally: + self._transaction_lock.release() + + def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: + """Acquire a write lock. If a lock is already held, it must be a write lock. + Upgrading from read to write is prohibited.""" + with self._internal_lock: + if self._lock_level > 0: + if self._current_mode != "write": + raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)") + self._lock_level += 1 + return AcquireReturnProxy(lock=self) + + timeout_ms = timeout_for_sqlite(timeout, blocking) + if not self._transaction_lock.acquire(blocking, timeout): + raise Timeout(self.lock_file) + try: + # Double-check: another thread might have completed acquisition meanwhile. + with self._internal_lock: + if self._lock_level > 0: + if self._current_mode != "write": + raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)") + self._lock_level += 1 + return AcquireReturnProxy(lock=self) + self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) - cur = None # Initialize cur to avoid potential UnboundLocalError - try: - cur = self.con.execute('BEGIN TRANSACTION;') - # BEGIN doesn't itself acquire a SHARED lock on the db, that is needed for - # effective exclusion with writeLock(). A SELECT is needed. - cur.execute('SELECT name from sqlite_schema LIMIT 1;') - self.cur = cur - except sqlite3.OperationalError as e: - if 'database is locked' not in str(e): - raise # Re-raise unexpected errors - if cur is not None: - cur.close() - raise Timeout(self._context.lock_file) + self.con.execute('BEGIN EXCLUSIVE TRANSACTION;') + with self._internal_lock: + self._current_mode = "write" + self._lock_level = 1 + + return AcquireReturnProxy(lock=self) + + except sqlite3.OperationalError as e: + if 'database is locked' not in str(e): + raise # Re-raise if it is an unexpected error. + raise Timeout(self.lock_file) + finally: + self._transaction_lock.release() + + def release(self, force: bool = False) -> None: + with self._internal_lock: + if self._lock_level == 0: + if force: + return + raise RuntimeError("Cannot release a lock that is not held") + if force: + self._lock_level = 0 + else: + self._lock_level -= 1 + if self._lock_level == 0: + # Clear current mode and rollback the SQLite transaction. + self._current_mode = None + # Unless there are bugs in this code, sqlite3.ProgrammingError + # must not be raise here, that is, the transaction should have been + # started in acquire(). + self.con.rollback() + + # ----- Context Manager Protocol ----- + # (We provide two context managers as helpers.) + + @contextmanager + def read_lock(self, timeout: float | None = None, + blocking: bool | None = None): + """Context manager for acquiring a read lock. + Attempts to upgrade to write lock are disallowed.""" + if timeout is None: + timeout = self.timeout + if blocking is None: + blocking = self.blocking + self.acquire_read(timeout, blocking) + try: + yield + finally: + self.release() + + @contextmanager + def write_lock(self, timeout: float | None = None, + blocking: bool | None = None): + """Context manager for acquiring a write lock. + Acquiring read locks on the same file while helding a write lock is prohibited.""" + if timeout is None: + timeout = self.timeout + if blocking is None: + blocking = self.blocking + self.acquire_write(timeout, blocking) + try: + yield + finally: + self.release() + + def __del__(self) -> None: + """Called when the lock object is deleted.""" + self.release(force=True) From 8938c15b3ac6e3e90b65df535e102c0d6378be65 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Mon, 24 Feb 2025 13:48:18 +0700 Subject: [PATCH 3/7] Reduce timeout from _transaction_lock.acquire() wait --- src/filelock/_read_write.py | 49 +++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py index 4557e61d..f0c06b02 100644 --- a/src/filelock/_read_write.py +++ b/src/filelock/_read_write.py @@ -2,6 +2,7 @@ import sqlite3 import threading import logging +import time from _error import Timeout from filelock._api import AcquireReturnProxy, BaseFileLock from typing import Literal, Any @@ -15,15 +16,23 @@ # systems. Use even a lower value to be safe. This 2 bln milliseconds is about 23 days. _MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 -def timeout_for_sqlite(timeout: float = -1, blocking: bool = True) -> int: +def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) -> int: if blocking is False: return 0 + if timeout == -1: return _MAX_SQLITE_TIMEOUT_MS + if timeout < 0: raise ValueError("timeout must be a non-negative number or -1") + if timeout > 0: + timeout = timeout - already_waited + if timeout < 0: + timeout = 0 + assert timeout >= 0 + timeout_ms = int(timeout * 1000) if timeout_ms > _MAX_SQLITE_TIMEOUT_MS or timeout_ms < 0: _LOGGER.warning("timeout %s is too large for SQLite, using %s ms instead", timeout, _MAX_SQLITE_TIMEOUT_MS) @@ -97,16 +106,22 @@ def __init__( def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: """Acquire a read lock. If a lock is already held, it must be a read lock. Upgrading from read to write is prohibited.""" + + # Attempt to re-enter already held lock. with self._internal_lock: if self._lock_level > 0: # Must already be in read mode. if self._current_mode != "read": - raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a write lock (downgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) timeout_ms = timeout_for_sqlite(timeout, blocking) + start_time = time.perf_counter() # Acquire the transaction lock so that the (possibly blocking) SQLite work # happens without conflicting with other threads' transaction work. if not self._transaction_lock.acquire(blocking, timeout): @@ -115,11 +130,16 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet # Double-check: another thread might have completed acquisition meanwhile. with self._internal_lock: if self._lock_level > 0: - # Must already be in read mode. if self._current_mode != "read": - raise RuntimeError("Cannot acquire read lock when a write lock is held (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a write lock (downgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) + + waited = time.perf_counter() - start_time + timeout_ms = timeout_for_sqlite(timeout, blocking, waited) self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) self.con.execute('BEGIN TRANSACTION;') @@ -143,14 +163,21 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: """Acquire a write lock. If a lock is already held, it must be a write lock. Upgrading from read to write is prohibited.""" + + # Attempt to re-enter already held lock. with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a read lock (upgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) - timeout_ms = timeout_for_sqlite(timeout, blocking) + start_time = time.perf_counter() + # Acquire the transaction lock so that the (possibly blocking) SQLite work + # happens without conflicting with other threads' transaction work. if not self._transaction_lock.acquire(blocking, timeout): raise Timeout(self.lock_file) try: @@ -158,10 +185,16 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError("Cannot acquire write lock: already holding a read lock (no upgrade allowed)") + raise RuntimeError( + f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a read lock (upgrade not allowed)" + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) + waited = time.perf_counter() - start_time + timeout_ms = timeout_for_sqlite(timeout, blocking, waited) + self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) self.con.execute('BEGIN EXCLUSIVE TRANSACTION;') @@ -183,7 +216,7 @@ def release(self, force: bool = False) -> None: if self._lock_level == 0: if force: return - raise RuntimeError("Cannot release a lock that is not held") + raise RuntimeError(f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held") if force: self._lock_level = 0 else: From 6b52214d9c26f17a90079be16b3bbc52ad7c0c37 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Tue, 4 Mar 2025 20:08:57 +0700 Subject: [PATCH 4/7] Add tests, move journal_mode=MEMORY pragma into transaction blocks --- src/filelock/__init__.py | 2 + src/filelock/_read_write.py | 69 ++- tests/test_read_write.py | 884 ++++++++++++++++++++++++++++++++++++ 3 files changed, 931 insertions(+), 24 deletions(-) create mode 100644 tests/test_read_write.py diff --git a/src/filelock/__init__.py b/src/filelock/__init__.py index c9d8c5b8..0ebdecd5 100644 --- a/src/filelock/__init__.py +++ b/src/filelock/__init__.py @@ -14,6 +14,7 @@ from ._api import AcquireReturnProxy, BaseFileLock from ._error import Timeout +from ._read_write import ReadWriteLock from ._soft import SoftFileLock from ._unix import UnixFileLock, has_fcntl from ._windows import WindowsFileLock @@ -62,6 +63,7 @@ "BaseAsyncFileLock", "BaseFileLock", "FileLock", + "ReadWriteLock", "SoftFileLock", "Timeout", "UnixFileLock", diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py index f0c06b02..8c6ef548 100644 --- a/src/filelock/_read_write.py +++ b/src/filelock/_read_write.py @@ -3,7 +3,7 @@ import threading import logging import time -from _error import Timeout +from ._error import Timeout from filelock._api import AcquireReturnProxy, BaseFileLock from typing import Literal, Any from contextlib import contextmanager @@ -62,8 +62,12 @@ def get_lock(cls, lock_file: str | os.PathLike[str], normalized = os.path.abspath(lock_file) with cls._instances_lock: if normalized not in cls._instances: - cls._instances[normalized] = cls(lock_file, timeout, blocking) - instance = cls._instances[normalized] + # Create the instance with a strong reference first + instance = super(_ReadWriteLockMeta, cls).__call__(lock_file, timeout, blocking, is_singleton=False) + cls._instances[normalized] = instance + else: + instance = cls._instances[normalized] + if instance.timeout != timeout or instance.blocking != blocking: raise ValueError("Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s", instance.timeout, instance.blocking, timeout, blocking) return instance @@ -89,19 +93,6 @@ def __init__( # _lock_level is the reentrance counter. self._lock_level = 0 self.con = sqlite3.connect(self.lock_file, check_same_thread=False) - # Using the legacy journal mode rather than more modern WAL mode because, - # apparently, in WAL mode it's impossible to enforce that read transactions - # (started with BEGIN TRANSACTION) are blocked if a concurrent write transaction, - # even EXCLUSIVE, is in progress, unless the read transactions actually read - # any pages modified by the write transaction. But in the legacy journal mode, - # it seems, it's possible to do this read-write locking without table data - # modification at each exclusive lock. - # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions - # "MEMORY" journal mode is fine because no actual writes to the are happening in write-lock - # acquire, so crashes cannot adversely affect the DB. Even journal_mode=OFF would probably - # be fine, too, but the SQLite documentation says that ROLLBACK becomes *undefined behaviour* - # with journal_mode=OFF which sounds scarier. - self.con.execute('PRAGMA journal_mode=MEMORY;') def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: """Acquire a read lock. If a lock is already held, it must be a read lock. @@ -119,8 +110,6 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet self._lock_level += 1 return AcquireReturnProxy(lock=self) - timeout_ms = timeout_for_sqlite(timeout, blocking) - start_time = time.perf_counter() # Acquire the transaction lock so that the (possibly blocking) SQLite work # happens without conflicting with other threads' transaction work. @@ -140,8 +129,31 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet waited = time.perf_counter() - start_time timeout_ms = timeout_for_sqlite(timeout, blocking, waited) - - self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) + self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms) + # WHY journal_mode=MEMORY? + # Using the legacy journal mode rather than more modern WAL mode because, + # apparently, in WAL mode it's impossible to enforce that read transactions + # (started with BEGIN TRANSACTION) are blocked if a concurrent write transaction, + # even EXCLUSIVE, is in progress, unless the read transactions actually read + # any pages modified by the write transaction. But in the legacy journal mode, + # it seems, it's possible to do this read-write locking without table data + # modification at each exclusive lock. + # See https://sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions + # "MEMORY" journal mode is fine because no actual writes to the are happening in write-lock + # acquire, so crashes cannot adversely affect the DB. Even journal_mode=OFF would probably + # be fine, too, but the SQLite documentation says that ROLLBACK becomes *undefined behaviour* + # with journal_mode=OFF which sounds scarier. + # + # WHY SETTING THIS PRAGMA HERE RATHER THAN IN ReadWriteLock.__init__()? + # Because setting this pragma may block on the database if it is locked at the moment, + # so we must set this pragma *after* `PRAGMA busy_timeout` above. + self.con.execute('PRAGMA journal_mode=MEMORY;') + # Recompute the remaining timeout after the potentially blocking pragma + # statement above. + waited = time.perf_counter() - start_time + timeout_ms_2 = timeout_for_sqlite(timeout, blocking, waited) + if timeout_ms_2 != timeout_ms: + self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms_2) self.con.execute('BEGIN TRANSACTION;') # Need to make SELECT to compel SQLite to actually acquire a SHARED db lock. # See https://www.sqlite.org/lockingv3.html#transaction_control @@ -194,8 +206,17 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe waited = time.perf_counter() - start_time timeout_ms = timeout_for_sqlite(timeout, blocking, waited) - - self.con.execute('PRAGMA busy_timeout=?;', (timeout_ms,)) + self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms) + # For explanations for both why we use journal_mode=MEMORY and why we set + # this pragma here rather than in ReadWriteLock.__init__(), see the comments + # in acquire_read(). + self.con.execute('PRAGMA journal_mode=MEMORY;') + # Recompute the remaining timeout after the potentially blocking pragma + # statement above. + waited = time.perf_counter() - start_time + timeout_ms_2 = timeout_for_sqlite(timeout, blocking, waited) + if timeout_ms_2 != timeout_ms: + self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms_2) self.con.execute('BEGIN EXCLUSIVE TRANSACTION;') with self._internal_lock: @@ -206,7 +227,7 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe except sqlite3.OperationalError as e: if 'database is locked' not in str(e): - raise # Re-raise if it is an unexpected error. + raise e # Re-raise unexpected errors. raise Timeout(self.lock_file) finally: self._transaction_lock.release() @@ -226,7 +247,7 @@ def release(self, force: bool = False) -> None: self._current_mode = None # Unless there are bugs in this code, sqlite3.ProgrammingError # must not be raise here, that is, the transaction should have been - # started in acquire(). + # started in acquire_read() or acquire_write(). self.con.rollback() # ----- Context Manager Protocol ----- diff --git a/tests/test_read_write.py b/tests/test_read_write.py new file mode 100644 index 00000000..6133d80e --- /dev/null +++ b/tests/test_read_write.py @@ -0,0 +1,884 @@ +from __future__ import annotations + +import time +import multiprocessing as mp +from contextlib import contextmanager +import pytest + +from filelock import Timeout +from filelock._read_write import ReadWriteLock + + +# Helper function to run in a separate process to acquire a read lock +def acquire_read_lock(lock_file, acquired_event, release_event=None, + timeout=-1, blocking=True, ready_event=None): + # Get error queue from current process if available + current_process = mp.current_process() + error_queue = getattr(current_process, '_error_queue', None) + + if ready_event: + ready_event.wait(timeout=10) + + try: + lock = ReadWriteLock(lock_file, timeout=timeout, blocking=blocking) + with lock.read_lock(): + acquired_event.set() + if release_event: + # Wait for signal to release if provided + release_event.wait(timeout=10) + else: + # Hold the lock for a short time + time.sleep(0.5) + return True + except Exception as e: + import traceback + error_msg = f"Read lock process error: {e}\n{traceback.format_exc()}" + print(error_msg) + if error_queue: + error_queue.put(error_msg) + return False + + +# Helper function to run in a separate process to acquire a write lock +def acquire_write_lock(lock_file, acquired_event, release_event=None, + timeout=-1, blocking=True, ready_event=None): + if ready_event: + ready_event.wait(timeout=10) + + try: + lock = ReadWriteLock(lock_file, timeout=timeout, blocking=blocking) + with lock.write_lock(): + acquired_event.set() + if release_event: + # Wait for signal to release if provided + release_event.wait(timeout=10) + else: + # Hold the lock for a short time + time.sleep(0.5) + return True + except Timeout: + return False + except Exception as e: + print(f"Write lock process error: {e}") + return False + + +# Helper function to try upgrading a read lock to a write lock (should fail) +def try_upgrade_lock(lock_file, read_acquired_event, upgrade_attempted_event, upgrade_result): + lock = ReadWriteLock(lock_file) + try: + with lock.read_lock(): + read_acquired_event.set() + time.sleep(0.2) # Ensure the read lock is established + + # Now try to acquire a write lock (should fail) + upgrade_attempted_event.set() + try: + with lock.write_lock(timeout=0.5): + upgrade_result.value = 1 # Succeeded (shouldn't happen) + except RuntimeError: + upgrade_result.value = 0 # Expected failure + except Timeout: + upgrade_result.value = 2 # Timeout (unexpected) + except Exception: + upgrade_result.value = 3 # Other error + except Exception as e: + print(f"Upgrade lock process error: {e}") + upgrade_result.value = 4 + + +# Helper function to try downgrading a write lock to a read lock (should fail) +def try_downgrade_lock(lock_file, write_acquired_event, downgrade_attempted_event, downgrade_result): + lock = ReadWriteLock(lock_file) + try: + with lock.write_lock(): + write_acquired_event.set() + time.sleep(0.2) # Ensure the write lock is established + + # Now try to acquire a read lock (should fail) + downgrade_attempted_event.set() + try: + with lock.read_lock(timeout=0.5): + downgrade_result.value = 1 # Succeeded (shouldn't happen) + except RuntimeError: + downgrade_result.value = 0 # Expected failure + except Timeout: + downgrade_result.value = 2 # Timeout (unexpected) + except Exception: + downgrade_result.value = 3 # Other error + except Exception as e: + print(f"Downgrade lock process error: {e}") + downgrade_result.value = 4 + + +@contextmanager +def cleanup_processes(processes): + error_queue = mp.Queue() + for p in processes: + # Store the queue in process for later retrieval + p._error_queue = error_queue + + try: + yield error_queue + finally: + # Check for errors before terminating processes + try: + while True: + error = error_queue.get(block=False) + print(f"Subprocess error: {error}") + except mp.queues.Empty: + pass + + # Cleanup processes + for p in processes: + if p.is_alive(): + p.terminate() + p.join(timeout=1) + + +@pytest.fixture +def lock_file(tmp_path): + return str(tmp_path / "test_lock.db") + + +@pytest.mark.timeout(20) +def test_read_locks_are_shared(lock_file): + """Test that multiple processes can acquire read locks simultaneously.""" + # Create shared events + read1_acquired = mp.Event() + read2_acquired = mp.Event() + + # Start two processes that acquire read locks + p1 = mp.Process(target=acquire_read_lock, args=(lock_file, read1_acquired)) + p2 = mp.Process(target=acquire_read_lock, args=(lock_file, read2_acquired)) + + with cleanup_processes([p1, p2]): + p1.start() + time.sleep(0.1) # Give p1 a chance to start acquiring + p2.start() + + # Both processes should be able to acquire read locks + assert read1_acquired.wait(timeout=2), f"First read lock not acquired on {lock_file}" + assert read2_acquired.wait(timeout=2), f"Second read lock not acquired on {lock_file}" + + # Wait for processes to finish + p1.join(timeout=2) + p2.join(timeout=2) + assert not p1.is_alive(), "Process 1 did not exit cleanly" + assert not p2.is_alive(), "Process 2 did not exit cleanly" + + +@pytest.mark.timeout(20) +def test_write_lock_excludes_other_write_locks(lock_file): + """Test that a write lock prevents other processes from acquiring write locks.""" + # Create shared events + write1_acquired = mp.Event() + release_write1 = mp.Event() + write2_acquired = mp.Event() + + # Start first process to acquire write lock + p1 = mp.Process(target=acquire_write_lock, + args=(lock_file, write1_acquired, release_write1)) + + # Second process will try to acquire with a short timeout + # We'll restart it after the first process releases the lock + p2 = mp.Process(target=acquire_write_lock, + args=(lock_file, write2_acquired, None, 0.5, True)) + + with cleanup_processes([p1]): + p1.start() + assert write1_acquired.wait(timeout=2), "First write lock not acquired" + + # Second process should not be able to acquire write lock + with cleanup_processes([p2]): + p2.start() + assert not write2_acquired.wait(timeout=1), "Second write lock should not be acquired" + + # Release first write lock + release_write1.set() + p1.join(timeout=2) + assert not p1.is_alive(), "Process 1 did not exit cleanly" + + # Create a new process to try acquiring the lock now that it's released + write2_acquired.clear() # Reset the event + p3 = mp.Process(target=acquire_write_lock, + args=(lock_file, write2_acquired, None)) + + with cleanup_processes([p3]): + p3.start() + # Now the new process should be able to acquire the lock + assert write2_acquired.wait(timeout=2), "Second write lock not acquired after first released" + p3.join(timeout=2) + assert not p3.is_alive(), "Process 3 did not exit cleanly" + + +@pytest.mark.timeout(20) +def test_write_lock_excludes_read_locks(lock_file): + """Test that a write lock prevents other processes from acquiring read locks.""" + # Create shared events + write_acquired = mp.Event() + release_write = mp.Event() + read_acquired = mp.Event() + read_started = mp.Event() # New event to signal when read attempt starts + + # Start process to acquire write lock + p1 = mp.Process(target=acquire_write_lock, + args=(lock_file, write_acquired, release_write)) + + # Start process to try to acquire read lock with no timeout + # Use a ready_event to control when the read lock attempt should start + p2 = mp.Process(target=acquire_read_lock, + args=(lock_file, read_acquired, None, -1, True, read_started)) + + with cleanup_processes([p1, p2]): + p1.start() + assert write_acquired.wait(timeout=2), "Write lock not acquired" + + # Start the read process but don't signal it to begin acquiring yet + p2.start() + + # Now signal p2 to attempt acquiring the read lock + read_started.set() + + # Wait a short time - read lock should NOT be acquired while write lock is held + time.sleep(2) + assert not read_acquired.is_set(), "Read lock should not be acquired while write lock held" + + # Release write lock + release_write.set() + p1.join(timeout=2) + + # Now read process should be able to acquire the lock + assert read_acquired.wait(timeout=2), "Read lock not acquired after write released" + + p2.join(timeout=2) + assert not p2.is_alive(), "Process 2 did not exit cleanly" + + +@pytest.mark.timeout(20) +def test_read_lock_excludes_write_locks(lock_file): + """Test that read locks prevent other processes from acquiring write locks.""" + # Create shared events + read_acquired = mp.Event() + release_read = mp.Event() + write_acquired = mp.Event() + write_started = mp.Event() # New event to signal when write attempt starts + + # Start process to acquire read lock + p1 = mp.Process(target=acquire_read_lock, + args=(lock_file, read_acquired, release_read)) + + # Start process to try to acquire write lock with no timeout + # Use a ready_event to control when the write lock attempt should start + p2 = mp.Process(target=acquire_write_lock, + args=(lock_file, write_acquired, None, -1, True, write_started)) + + with cleanup_processes([p1, p2]): + p1.start() + assert read_acquired.wait(timeout=2), "Read lock not acquired" + + # Start the write process but don't signal it to begin acquiring yet + p2.start() + + # Now signal p2 to attempt acquiring the write lock + write_started.set() + + # Wait a short time - write lock should NOT be acquired while read lock is held + time.sleep(2) + assert not write_acquired.is_set(), "Write lock should not be acquired while read lock held" + + # Release read lock + release_read.set() + p1.join(timeout=2) + + # Now write process should be able to acquire the lock + assert write_acquired.wait(timeout=2), "Write lock not acquired after read released" + + p2.join(timeout=2) + assert not p2.is_alive(), "Process 2 did not exit cleanly" + + +# Move this function to module level (before the test functions) +def chain_reader(idx, lock_file, release_count, + forward_wait, backward_wait, forward_set, backward_set): + # Wait for signal to start acquiring + forward_wait.wait(timeout=10) + + try: + lock = ReadWriteLock(lock_file) + with lock.read_lock(): + if idx > 0: + # Don't make all read locks set off immediately via the forward_set + # chain. + time.sleep(2) + + # Signal next reader to start if not the last one + if forward_set is not None: + forward_set.set() + + if idx == 0: + # Hold off releasing the write lock process (backward_set is writer_ready at idx=0) + time.sleep(1) + + backward_set.set() + + # Wait for a signal from the next read to release, so that there is + # always a read lock holding. Non-starvating write lock from another + # process must make this backward_wait to timeout, actually. + backward_wait.wait(timeout=10) + + # Increment the release counter before releasing + with release_count.get_lock(): + release_count.value += 1 + + except Exception as e: + print(f"Chain reader {idx} error: {e}") + +@pytest.mark.timeout(40) +def test_write_non_starvation(lock_file): + """Test that write locks can eventually be acquired even with continuous read locks. + + Creates a chain of reader processes where the writer starts after the first reader + acquires a lock. The writer should be able to acquire its lock before the entire + reader chain has finished, demonstrating non-starvation. + """ + NUM_READERS = 7 + + # Create events for coordination + chain_forward = [mp.Event() for _ in range(NUM_READERS)] # Signal to start acquiring + chain_backward = [mp.Event() for _ in range(NUM_READERS)] # Signal to release + writer_ready = mp.Event() + writer_acquired = mp.Event() + + # Shared counter to track how many readers have released + release_count = mp.Value('i', 0) + + # Create reader processes + readers = [] + for i in range(NUM_READERS): + forward_set = chain_forward[i+1] if i < NUM_READERS - 1 else None + backward_set = chain_backward[i-1] if i > 0 else writer_ready + reader = mp.Process( + target=chain_reader, + args=(i, lock_file, release_count, chain_forward[i], chain_backward[i], + forward_set, backward_set) + ) + readers.append(reader) + + # Create writer process that will try to acquire after first reader is established + writer = mp.Process( + target=acquire_write_lock, + args=(lock_file, writer_acquired, None, 20, True, writer_ready) + ) + + with cleanup_processes(readers + [writer]): + # Start all reader processes (they'll wait for their start signal) + for reader in readers: + reader.start() + + # Signal the first reader to start + chain_forward[0].set() + + # Wait a bit for the first reader to acquire and signal the writer + assert writer_ready.wait(timeout=10), "First reader did not acquire lock" + + # Start the writer process (it will wait for writer_ready event) + writer.start() + + assert writer_acquired.wait(timeout=22), "Writer couldn't acquire lock - possible starvation" + + with release_count.get_lock(): + read_releases = release_count.value + + assert read_releases < 3, ( + f"Writer acquired after {read_releases} readers released - this indicates starvation" + ) + + # Wait for writer to finish + writer.join(timeout=2) + assert not writer.is_alive(), "Writer did not exit cleanly" + + # Let the last reader release + chain_backward[-1].set() + + # Wait for all readers to finish + for idx, reader in enumerate(readers): + reader.join(timeout=3) + assert not reader.is_alive(), f"Reader {idx} did not exit cleanly" + + +def try_upgrade_lock(lock_file, read_acquired, upgrade_attempted, upgrade_result): + lock = ReadWriteLock(lock_file) + try: + with lock.read_lock(): + read_acquired.set() + upgrade_attempted.set() + try: + with lock.write_lock(): + pass # Should not reach here + except RuntimeError: + upgrade_result.value = 0 # Expected behavior + except Exception as e: + print(f"Unexpected error during upgrade: {e}") + upgrade_result.value = 2 + except Exception as e: + print(f"Unexpected error acquiring read lock: {e}") + upgrade_result.value = 1 + + +def try_downgrade_lock(lock_file, write_acquired, downgrade_attempted, downgrade_result): + lock = ReadWriteLock(lock_file) + try: + with lock.write_lock(): + write_acquired.set() + downgrade_attempted.set() + try: + with lock.read_lock(): + pass # Should not reach here + except RuntimeError: + downgrade_result.value = 0 # Expected behavior + except Exception as e: + print(f"Unexpected error during downgrade: {e}") + downgrade_result.value = 2 + except Exception as e: + print(f"Unexpected error acquiring write lock: {e}") + downgrade_result.value = 1 + + +def recursive_read_lock(lock_file, success_flag): + lock = ReadWriteLock(lock_file) + try: + with lock.read_lock(): + # First acquisition + assert lock._lock_level == 1 + assert lock._current_mode == "read" + + with lock.read_lock(): + # Second acquisition + assert lock._lock_level == 2 + assert lock._current_mode == "read" + + with lock.read_lock(): + # Third acquisition + assert lock._lock_level == 3 + assert lock._current_mode == "read" + + # After third release + assert lock._lock_level == 2 + assert lock._current_mode == "read" + + # After second release + assert lock._lock_level == 1 + assert lock._current_mode == "read" + + # After first release + assert lock._lock_level == 0 + assert lock._current_mode is None + + success_flag.value = 1 + except Exception as e: + print(f"Recursive read lock error: {e}") + success_flag.value = 0 + + +@pytest.mark.timeout(10) +def test_recursive_read_lock_acquisition(lock_file): + """Test that the same process can acquire the same read lock multiple times.""" + success = mp.Value('i', 0) + p = mp.Process(target=recursive_read_lock, args=(lock_file, success)) + + with cleanup_processes([p]): + p.start() + p.join(timeout=5) + + +@pytest.mark.timeout(10) +def test_lock_upgrade_prohibited(lock_file): + """Test that a process cannot upgrade from a read lock to a write lock.""" + read_acquired = mp.Event() + upgrade_attempted = mp.Event() + upgrade_result = mp.Value('i', -1) + + p = mp.Process(target=try_upgrade_lock, + args=(lock_file, read_acquired, upgrade_attempted, upgrade_result)) + + with cleanup_processes([p]): + p.start() + + # Wait for read lock to be acquired + assert read_acquired.wait(timeout=2), "Read lock not acquired" + + # Wait for upgrade to be attempted + assert upgrade_attempted.wait(timeout=2), "Upgrade not attempted" + + # Wait for process to finish + p.join(timeout=2) + assert not p.is_alive(), "Process did not exit cleanly" + + # Verify result + assert upgrade_result.value == 0, "Read lock was incorrectly upgraded to write lock" + + +@pytest.mark.timeout(10) +def test_lock_downgrade_prohibited(lock_file): + """Test that a process cannot downgrade from a write lock to a read lock.""" + write_acquired = mp.Event() + downgrade_attempted = mp.Event() + downgrade_result = mp.Value('i', -1) + + p = mp.Process(target=try_downgrade_lock, + args=(lock_file, write_acquired, downgrade_attempted, downgrade_result)) + + with cleanup_processes([p]): + p.start() + + # Wait for write lock to be acquired + assert write_acquired.wait(timeout=2), "Write lock not acquired" + + # Wait for downgrade to be attempted + assert downgrade_attempted.wait(timeout=2), "Downgrade not attempted" + + # Wait for process to finish + p.join(timeout=2) + assert not p.is_alive(), "Process did not exit cleanly" + + # Verify result + assert downgrade_result.value == 0, "Write lock was incorrectly downgraded to read lock" + + +@pytest.mark.timeout(10) +def test_timeout_behavior(lock_file): + """Test that timeout parameter works correctly in multi-process environment.""" + # Create shared events + write_acquired = mp.Event() + release_write = mp.Event() + read_acquired = mp.Event() + + # Start process to acquire write lock and hold it + p1 = mp.Process(target=acquire_write_lock, + args=(lock_file, write_acquired, release_write)) + + # Start process to try to acquire read lock with timeout + p2 = mp.Process(target=acquire_read_lock, + args=(lock_file, read_acquired, None, 0.5, True)) + + with cleanup_processes([p1, p2]): + p1.start() + assert write_acquired.wait(timeout=2), "Write lock not acquired" + + # Start timer to measure timeout + start_time = time.time() + p2.start() + + # Process should not acquire read lock and should exit after timeout + assert not read_acquired.wait(timeout=1), "Read lock should not be acquired" + p2.join(timeout=5) # Allow more time for joining + + # Verify timeout duration was approximately correct + # Make the timing constraints more lenient + elapsed = time.time() - start_time + assert 0.4 <= elapsed <= 10.0, f"Timeout was not respected: {elapsed}s" + + # Clean up + release_write.set() + p1.join(timeout=2) + + +@pytest.mark.timeout(10) +def test_non_blocking_behavior(lock_file): + """Test that non-blocking parameter works correctly. + + This test directly attempts to acquire a read lock in non-blocking mode + when a write lock is already held by another process. + """ + # Create shared events for the write lock + write_acquired = mp.Event() + release_write = mp.Event() + + # Start process to acquire write lock and hold it + p1 = mp.Process(target=acquire_write_lock, + args=(lock_file, write_acquired, release_write)) + + with cleanup_processes([p1]): + p1.start() + assert write_acquired.wait(timeout=2), "Write lock not acquired" + + lock = ReadWriteLock(lock_file) + + # Start timer to measure how quickly non-blocking returns + start_time = time.time() + + # Attempt to acquire a read lock in non-blocking mode + try: + with lock.read_lock(blocking=False): + # Should never reach here + pytest.fail("Non-blocking read lock was unexpectedly acquired") + except Timeout: + # Expected behavior - lock acquisition should fail immediately + pass + + elapsed = time.time() - start_time + + # Non-blocking should return very quickly + assert elapsed < 0.1, f"Non-blocking took too long: {elapsed}s" + + # Clean up + release_write.set() + p1.join(timeout=2) + + +# Move this function to module level (before the test functions) +def recursive_read_lock(lock_file, success_flag): + lock = ReadWriteLock(lock_file) + try: + with lock.read_lock(): + # First acquisition + assert lock._lock_level == 1 + assert lock._current_mode == "read" + + with lock.read_lock(): + # Second acquisition + assert lock._lock_level == 2 + assert lock._current_mode == "read" + + with lock.read_lock(): + # Third acquisition + assert lock._lock_level == 3 + assert lock._current_mode == "read" + + # After third release + assert lock._lock_level == 2 + assert lock._current_mode == "read" + + # After second release + assert lock._lock_level == 1 + assert lock._current_mode == "read" + + # After first release + assert lock._lock_level == 0 + assert lock._current_mode is None + + success_flag.value = 1 + except Exception as e: + print(f"Recursive read lock error: {e}") + success_flag.value = 0 + + +@pytest.mark.timeout(10) +def test_recursive_read_lock_acquisition(lock_file): + """Test that the same process can acquire the same read lock multiple times.""" + success = mp.Value('i', 0) + p = mp.Process(target=recursive_read_lock, args=(lock_file, success)) + + with cleanup_processes([p]): + p.start() + p.join(timeout=5) + + assert success.value == 1, "Recursive read lock acquisition failed" + + +# Move this function to module level (before the test functions) +def recursive_write_lock(lock_file, success_flag): + lock = ReadWriteLock(lock_file) + try: + with lock.write_lock(): + # First acquisition + assert lock._lock_level == 1 + assert lock._current_mode == "write" + + with lock.write_lock(): + # Second acquisition + assert lock._lock_level == 2 + assert lock._current_mode == "write" + + with lock.write_lock(): + # Third acquisition + assert lock._lock_level == 3 + assert lock._current_mode == "write" + + # After third release + assert lock._lock_level == 2 + assert lock._current_mode == "write" + + # After second release + assert lock._lock_level == 1 + assert lock._current_mode == "write" + + # After first release + assert lock._lock_level == 0 + assert lock._current_mode is None + + success_flag.value = 1 + except Exception as e: + print(f"Recursive write lock error: {e}") + success_flag.value = 0 + + +@pytest.mark.timeout(10) +def test_recursive_write_lock_acquisition(lock_file): + """Test that the same process can acquire the same write lock multiple times.""" + success = mp.Value('i', 0) + p = mp.Process(target=recursive_write_lock, args=(lock_file, success)) + + with cleanup_processes([p]): + p.start() + p.join(timeout=5) + + assert success.value == 1, "Recursive write lock acquisition failed" + + +def acquire_write_lock_and_crash(lock_file, acquired_event): + lock = ReadWriteLock(lock_file) + with lock.write_lock(): + acquired_event.set() + # Simulate process crash with infinite loop + while True: + time.sleep(0.1) + + +@pytest.mark.timeout(15) +def test_write_lock_release_on_process_termination(lock_file): + """Test that write locks are properly released if a process terminates.""" + # Create shared events + lock_acquired = mp.Event() + + # Start a process that will acquire the lock and then "crash" + p1 = mp.Process(target=acquire_write_lock_and_crash, args=(lock_file, lock_acquired)) + p1.start() + + # Wait for lock to be acquired + assert lock_acquired.wait(timeout=2), "Lock not acquired by first process" + + # Create second process that will try to acquire the lock + write_acquired = mp.Event() + p2 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired)) + + with cleanup_processes([p1, p2]): + # Terminate the first process (simulating a crash) + time.sleep(0.5) # Ensure lock is fully acquired + p1.terminate() + p1.join(timeout=2) + + # Start second process - should be able to acquire the lock + p2.start() + + # Check if second process can acquire the lock + assert write_acquired.wait(timeout=5), "Lock not acquired after process termination" + + p2.join(timeout=2) + assert not p2.is_alive(), "Second process did not exit cleanly" + + +def acquire_read_lock_and_crash(lock_file, acquired_event): + lock = ReadWriteLock(lock_file) + with lock.read_lock(): + acquired_event.set() + # Simulate process crash with infinite loop + while True: + time.sleep(0.1) + + +@pytest.mark.timeout(15) +def test_read_lock_release_on_process_termination(lock_file): + """Test that readlocks are properly released if a process terminates.""" + # Create shared events + lock_acquired = mp.Event() + + # Start a process that will acquire the lock and then "crash" + p1 = mp.Process(target=acquire_read_lock_and_crash, args=(lock_file, lock_acquired)) + p1.start() + + # Wait for lock to be acquired + assert lock_acquired.wait(timeout=2), "Lock not acquired by first process" + + # Create second process that will try to acquire the lock + write_acquired = mp.Event() + p2 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired)) + + with cleanup_processes([p1, p2]): + # Terminate the first process (simulating a crash) + time.sleep(0.5) # Ensure lock is fully acquired + p1.terminate() + p1.join(timeout=2) + + # Start second process - should be able to acquire the lock + p2.start() + + # Check if second process can acquire the lock + assert write_acquired.wait(timeout=5), "Lock not acquired after process termination" + + p2.join(timeout=2) + assert not p2.is_alive(), "Second process did not exit cleanly" + + +@pytest.mark.timeout(15) +def test_single_read_lock_acquire_release(lock_file): + """Test that a single read lock can be acquired and released.""" + # Create a lock + lock = ReadWriteLock(lock_file) + + # Acquire and release a read lock + with lock.read_lock(): + # Lock is acquired here + assert True, "Read lock acquired" + # Let's verify we can read the same lock again (read locks are reentrant) + with lock.read_lock(): + assert True, "Read lock acquired again" + + # Lock should be released here + # We can test this by acquiring it again + with lock.read_lock(): + assert True, "Read lock can be acquired after release" + + +@pytest.mark.timeout(15) +def test_single_write_lock_acquire_release(lock_file): + """Test that a single write lock can be acquired and released.""" + # Create a lock + lock = ReadWriteLock(lock_file) + + # Acquire and release a write lock + with lock.write_lock(): + # Lock is acquired here + assert True, "Write lock acquired" + # Let's verify we can write lock again (write locks are reentrant) + with lock.write_lock(): + assert True, "Write lock acquired again" + + # Lock should be released here + # We can test this by acquiring it again + with lock.write_lock(): + assert True, "Write lock can be acquired after release" + + +@pytest.mark.timeout(15) +def test_read_then_write_lock(lock_file): + """Test that we can acquire a read lock and then a write lock after releasing it.""" + lock = ReadWriteLock(lock_file) + + # First acquire a read lock + with lock.read_lock(): + assert True, "Read lock acquired" + + # After releasing the read lock, we should be able to acquire a write lock + with lock.write_lock(): + assert True, "Write lock acquired after read lock released" + + +@pytest.mark.timeout(15) +def test_write_then_read_lock(lock_file): + """Test that we can acquire a write lock and then a read lock after releasing it.""" + lock = ReadWriteLock(lock_file) + + # First acquire a write lock + with lock.write_lock(): + assert True, "Write lock acquired" + + # After releasing the write lock, we should be able to acquire a read lock + with lock.read_lock(): + assert True, "Read lock acquired after write lock released" + + +if __name__ == "__main__": + # Set up multiprocessing to spawn instead of fork + mp.set_start_method('spawn') \ No newline at end of file From 0f8302d1311884a9f9ab4782470a63483630d21f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 13:10:38 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/filelock/_read_write.py | 155 +++++++------ tests/test_read_write.py | 424 +++++++++++++++++------------------- 2 files changed, 291 insertions(+), 288 deletions(-) diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py index 8c6ef548..ea997bdb 100644 --- a/src/filelock/_read_write.py +++ b/src/filelock/_read_write.py @@ -1,14 +1,18 @@ +from __future__ import annotations + +import logging import os import sqlite3 import threading -import logging import time -from ._error import Timeout -from filelock._api import AcquireReturnProxy, BaseFileLock -from typing import Literal, Any from contextlib import contextmanager +from typing import Any, Literal from weakref import WeakValueDictionary +from filelock._api import AcquireReturnProxy + +from ._error import Timeout + _LOGGER = logging.getLogger("filelock") # PRAGMA busy_timeout=N delegates to https://www.sqlite.org/c3ref/busy_timeout.html, @@ -16,21 +20,22 @@ # systems. Use even a lower value to be safe. This 2 bln milliseconds is about 23 days. _MAX_SQLITE_TIMEOUT_MS = 2_000_000_000 - 1 + def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) -> int: if blocking is False: return 0 - + if timeout == -1: return _MAX_SQLITE_TIMEOUT_MS - + if timeout < 0: - raise ValueError("timeout must be a non-negative number or -1") - + msg = "timeout must be a non-negative number or -1" + raise ValueError(msg) + if timeout > 0: - timeout = timeout - already_waited - if timeout < 0: - timeout = 0 - + timeout -= already_waited + timeout = max(timeout, 0) + assert timeout >= 0 timeout_ms = int(timeout * 1000) @@ -42,9 +47,16 @@ def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) -> class _ReadWriteLockMeta(type): """Metaclass that redirects instance creation to get_lock() when is_singleton=True.""" - def __call__(cls, lock_file: str | os.PathLike[str], - timeout: float = -1, blocking: bool = True, - is_singleton: bool = True, *args: Any, **kwargs: Any) -> "ReadWriteLock": + + def __call__( + cls, + lock_file: str | os.PathLike[str], + timeout: float = -1, + blocking: bool = True, + is_singleton: bool = True, + *args: Any, + **kwargs: Any, + ) -> ReadWriteLock: if is_singleton: return cls.get_lock(lock_file, timeout, blocking) return super().__call__(lock_file, timeout, blocking, is_singleton, *args, **kwargs) @@ -56,8 +68,7 @@ class ReadWriteLock(metaclass=_ReadWriteLockMeta): _instances_lock = threading.Lock() @classmethod - def get_lock(cls, lock_file: str | os.PathLike[str], - timeout: float = -1, blocking: bool = True) -> "ReadWriteLock": + def get_lock(cls, lock_file: str | os.PathLike[str], timeout: float = -1, blocking: bool = True) -> ReadWriteLock: """Return the one-and-only ReadWriteLock for a given file.""" normalized = os.path.abspath(lock_file) with cls._instances_lock: @@ -67,9 +78,16 @@ def get_lock(cls, lock_file: str | os.PathLike[str], cls._instances[normalized] = instance else: instance = cls._instances[normalized] - + if instance.timeout != timeout or instance.blocking != blocking: - raise ValueError("Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s", instance.timeout, instance.blocking, timeout, blocking) + msg = "Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s" + raise ValueError( + msg, + instance.timeout, + instance.blocking, + timeout, + blocking, + ) return instance def __init__( @@ -89,24 +107,28 @@ def __init__( self._internal_lock = threading.Lock() self._lock_level = 0 # Reentrance counter. # _current_mode holds the active lock mode ("read" or "write") or None if no lock is held. - self._current_mode: Literal["read", "write", None] = None + self._current_mode: Literal["read", "write"] | None = None # _lock_level is the reentrance counter. self._lock_level = 0 self.con = sqlite3.connect(self.lock_file, check_same_thread=False) def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: - """Acquire a read lock. If a lock is already held, it must be a read lock. - Upgrading from read to write is prohibited.""" - + """ + Acquire a read lock. If a lock is already held, it must be a read lock. + Upgrading from read to write is prohibited. + """ # Attempt to re-enter already held lock. with self._internal_lock: if self._lock_level > 0: # Must already be in read mode. if self._current_mode != "read": - raise RuntimeError( - f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " - "already holding a write lock (downgrade not allowed)" - ) + msg = ( + f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " + "already holding a write lock (downgrade not allowed)" + ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -120,16 +142,19 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet with self._internal_lock: if self._lock_level > 0: if self._current_mode != "read": - raise RuntimeError( + msg = ( f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " "already holding a write lock (downgrade not allowed)" ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) - + waited = time.perf_counter() - start_time timeout_ms = timeout_for_sqlite(timeout, blocking, waited) - self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms) + self.con.execute("PRAGMA busy_timeout=%d;" % timeout_ms) # WHY journal_mode=MEMORY? # Using the legacy journal mode rather than more modern WAL mode because, # apparently, in WAL mode it's impossible to enforce that read transactions @@ -147,43 +172,47 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet # WHY SETTING THIS PRAGMA HERE RATHER THAN IN ReadWriteLock.__init__()? # Because setting this pragma may block on the database if it is locked at the moment, # so we must set this pragma *after* `PRAGMA busy_timeout` above. - self.con.execute('PRAGMA journal_mode=MEMORY;') + self.con.execute("PRAGMA journal_mode=MEMORY;") # Recompute the remaining timeout after the potentially blocking pragma # statement above. waited = time.perf_counter() - start_time timeout_ms_2 = timeout_for_sqlite(timeout, blocking, waited) if timeout_ms_2 != timeout_ms: - self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms_2) - self.con.execute('BEGIN TRANSACTION;') + self.con.execute("PRAGMA busy_timeout=%d;" % timeout_ms_2) + self.con.execute("BEGIN TRANSACTION;") # Need to make SELECT to compel SQLite to actually acquire a SHARED db lock. # See https://www.sqlite.org/lockingv3.html#transaction_control - self.con.execute('SELECT name from sqlite_schema LIMIT 1;') + self.con.execute("SELECT name from sqlite_schema LIMIT 1;") with self._internal_lock: self._current_mode = "read" self._lock_level = 1 - + return AcquireReturnProxy(lock=self) except sqlite3.OperationalError as e: - if 'database is locked' not in str(e): + if "database is locked" not in str(e): raise # Re-raise unexpected errors. raise Timeout(self.lock_file) finally: self._transaction_lock.release() def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: - """Acquire a write lock. If a lock is already held, it must be a write lock. - Upgrading from read to write is prohibited.""" - + """ + Acquire a write lock. If a lock is already held, it must be a write lock. + Upgrading from read to write is prohibited. + """ # Attempt to re-enter already held lock. with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError( + msg = ( f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " "already holding a read lock (upgrade not allowed)" ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -197,37 +226,40 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError( + msg = ( f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " "already holding a read lock (upgrade not allowed)" ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) - + waited = time.perf_counter() - start_time timeout_ms = timeout_for_sqlite(timeout, blocking, waited) - self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms) + self.con.execute("PRAGMA busy_timeout=%d;" % timeout_ms) # For explanations for both why we use journal_mode=MEMORY and why we set # this pragma here rather than in ReadWriteLock.__init__(), see the comments # in acquire_read(). - self.con.execute('PRAGMA journal_mode=MEMORY;') + self.con.execute("PRAGMA journal_mode=MEMORY;") # Recompute the remaining timeout after the potentially blocking pragma # statement above. waited = time.perf_counter() - start_time timeout_ms_2 = timeout_for_sqlite(timeout, blocking, waited) if timeout_ms_2 != timeout_ms: - self.con.execute('PRAGMA busy_timeout=%d;' % timeout_ms_2) - self.con.execute('BEGIN EXCLUSIVE TRANSACTION;') + self.con.execute("PRAGMA busy_timeout=%d;" % timeout_ms_2) + self.con.execute("BEGIN EXCLUSIVE TRANSACTION;") with self._internal_lock: self._current_mode = "write" self._lock_level = 1 - + return AcquireReturnProxy(lock=self) except sqlite3.OperationalError as e: - if 'database is locked' not in str(e): - raise e # Re-raise unexpected errors. + if "database is locked" not in str(e): + raise # Re-raise unexpected errors. raise Timeout(self.lock_file) finally: self._transaction_lock.release() @@ -237,7 +269,8 @@ def release(self, force: bool = False) -> None: if self._lock_level == 0: if force: return - raise RuntimeError(f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held") + msg = f"Cannot release a lock on {self.lock_file} (lock id: {id(self)}) that is not held" + raise RuntimeError(msg) if force: self._lock_level = 0 else: @@ -254,10 +287,11 @@ def release(self, force: bool = False) -> None: # (We provide two context managers as helpers.) @contextmanager - def read_lock(self, timeout: float | None = None, - blocking: bool | None = None): - """Context manager for acquiring a read lock. - Attempts to upgrade to write lock are disallowed.""" + def read_lock(self, timeout: float | None = None, blocking: bool | None = None): + """ + Context manager for acquiring a read lock. + Attempts to upgrade to write lock are disallowed. + """ if timeout is None: timeout = self.timeout if blocking is None: @@ -269,10 +303,11 @@ def read_lock(self, timeout: float | None = None, self.release() @contextmanager - def write_lock(self, timeout: float | None = None, - blocking: bool | None = None): - """Context manager for acquiring a write lock. - Acquiring read locks on the same file while helding a write lock is prohibited.""" + def write_lock(self, timeout: float | None = None, blocking: bool | None = None): + """ + Context manager for acquiring a write lock. + Acquiring read locks on the same file while helding a write lock is prohibited. + """ if timeout is None: timeout = self.timeout if blocking is None: @@ -282,9 +317,7 @@ def write_lock(self, timeout: float | None = None, yield finally: self.release() - + def __del__(self) -> None: """Called when the lock object is deleted.""" self.release(force=True) - - diff --git a/tests/test_read_write.py b/tests/test_read_write.py index 6133d80e..7d194a14 100644 --- a/tests/test_read_write.py +++ b/tests/test_read_write.py @@ -1,8 +1,9 @@ from __future__ import annotations -import time import multiprocessing as mp +import time from contextlib import contextmanager + import pytest from filelock import Timeout @@ -10,15 +11,14 @@ # Helper function to run in a separate process to acquire a read lock -def acquire_read_lock(lock_file, acquired_event, release_event=None, - timeout=-1, blocking=True, ready_event=None): +def acquire_read_lock(lock_file, acquired_event, release_event=None, timeout=-1, blocking=True, ready_event=None) -> bool | None: # Get error queue from current process if available current_process = mp.current_process() - error_queue = getattr(current_process, '_error_queue', None) - + error_queue = getattr(current_process, "_error_queue", None) + if ready_event: ready_event.wait(timeout=10) - + try: lock = ReadWriteLock(lock_file, timeout=timeout, blocking=blocking) with lock.read_lock(): @@ -32,19 +32,18 @@ def acquire_read_lock(lock_file, acquired_event, release_event=None, return True except Exception as e: import traceback + error_msg = f"Read lock process error: {e}\n{traceback.format_exc()}" - print(error_msg) if error_queue: error_queue.put(error_msg) return False # Helper function to run in a separate process to acquire a write lock -def acquire_write_lock(lock_file, acquired_event, release_event=None, - timeout=-1, blocking=True, ready_event=None): +def acquire_write_lock(lock_file, acquired_event, release_event=None, timeout=-1, blocking=True, ready_event=None) -> bool | None: if ready_event: ready_event.wait(timeout=10) - + try: lock = ReadWriteLock(lock_file, timeout=timeout, blocking=blocking) with lock.write_lock(): @@ -58,19 +57,18 @@ def acquire_write_lock(lock_file, acquired_event, release_event=None, return True except Timeout: return False - except Exception as e: - print(f"Write lock process error: {e}") + except Exception: return False # Helper function to try upgrading a read lock to a write lock (should fail) -def try_upgrade_lock(lock_file, read_acquired_event, upgrade_attempted_event, upgrade_result): +def try_upgrade_lock(lock_file, read_acquired_event, upgrade_attempted_event, upgrade_result) -> None: lock = ReadWriteLock(lock_file) try: with lock.read_lock(): read_acquired_event.set() time.sleep(0.2) # Ensure the read lock is established - + # Now try to acquire a write lock (should fail) upgrade_attempted_event.set() try: @@ -82,19 +80,18 @@ def try_upgrade_lock(lock_file, read_acquired_event, upgrade_attempted_event, up upgrade_result.value = 2 # Timeout (unexpected) except Exception: upgrade_result.value = 3 # Other error - except Exception as e: - print(f"Upgrade lock process error: {e}") + except Exception: upgrade_result.value = 4 # Helper function to try downgrading a write lock to a read lock (should fail) -def try_downgrade_lock(lock_file, write_acquired_event, downgrade_attempted_event, downgrade_result): +def try_downgrade_lock(lock_file, write_acquired_event, downgrade_attempted_event, downgrade_result) -> None: lock = ReadWriteLock(lock_file) try: with lock.write_lock(): write_acquired_event.set() time.sleep(0.2) # Ensure the write lock is established - + # Now try to acquire a read lock (should fail) downgrade_attempted_event.set() try: @@ -106,8 +103,7 @@ def try_downgrade_lock(lock_file, write_acquired_event, downgrade_attempted_even downgrade_result.value = 2 # Timeout (unexpected) except Exception: downgrade_result.value = 3 # Other error - except Exception as e: - print(f"Downgrade lock process error: {e}") + except Exception: downgrade_result.value = 4 @@ -117,18 +113,17 @@ def cleanup_processes(processes): for p in processes: # Store the queue in process for later retrieval p._error_queue = error_queue - + try: yield error_queue finally: # Check for errors before terminating processes try: while True: - error = error_queue.get(block=False) - print(f"Subprocess error: {error}") + error_queue.get(block=False) except mp.queues.Empty: pass - + # Cleanup processes for p in processes: if p.is_alive(): @@ -142,25 +137,25 @@ def lock_file(tmp_path): @pytest.mark.timeout(20) -def test_read_locks_are_shared(lock_file): +def test_read_locks_are_shared(lock_file) -> None: """Test that multiple processes can acquire read locks simultaneously.""" # Create shared events read1_acquired = mp.Event() read2_acquired = mp.Event() - + # Start two processes that acquire read locks p1 = mp.Process(target=acquire_read_lock, args=(lock_file, read1_acquired)) p2 = mp.Process(target=acquire_read_lock, args=(lock_file, read2_acquired)) - + with cleanup_processes([p1, p2]): p1.start() time.sleep(0.1) # Give p1 a chance to start acquiring p2.start() - + # Both processes should be able to acquire read locks assert read1_acquired.wait(timeout=2), f"First read lock not acquired on {lock_file}" assert read2_acquired.wait(timeout=2), f"Second read lock not acquired on {lock_file}" - + # Wait for processes to finish p1.join(timeout=2) p2.join(timeout=2) @@ -169,41 +164,38 @@ def test_read_locks_are_shared(lock_file): @pytest.mark.timeout(20) -def test_write_lock_excludes_other_write_locks(lock_file): +def test_write_lock_excludes_other_write_locks(lock_file) -> None: """Test that a write lock prevents other processes from acquiring write locks.""" # Create shared events write1_acquired = mp.Event() release_write1 = mp.Event() write2_acquired = mp.Event() - + # Start first process to acquire write lock - p1 = mp.Process(target=acquire_write_lock, - args=(lock_file, write1_acquired, release_write1)) - + p1 = mp.Process(target=acquire_write_lock, args=(lock_file, write1_acquired, release_write1)) + # Second process will try to acquire with a short timeout # We'll restart it after the first process releases the lock - p2 = mp.Process(target=acquire_write_lock, - args=(lock_file, write2_acquired, None, 0.5, True)) - + p2 = mp.Process(target=acquire_write_lock, args=(lock_file, write2_acquired, None, 0.5, True)) + with cleanup_processes([p1]): p1.start() assert write1_acquired.wait(timeout=2), "First write lock not acquired" - + # Second process should not be able to acquire write lock with cleanup_processes([p2]): p2.start() assert not write2_acquired.wait(timeout=1), "Second write lock should not be acquired" - + # Release first write lock release_write1.set() p1.join(timeout=2) assert not p1.is_alive(), "Process 1 did not exit cleanly" - + # Create a new process to try acquiring the lock now that it's released write2_acquired.clear() # Reset the event - p3 = mp.Process(target=acquire_write_lock, - args=(lock_file, write2_acquired, None)) - + p3 = mp.Process(target=acquire_write_lock, args=(lock_file, write2_acquired, None)) + with cleanup_processes([p3]): p3.start() # Now the new process should be able to acquire the lock @@ -213,97 +205,92 @@ def test_write_lock_excludes_other_write_locks(lock_file): @pytest.mark.timeout(20) -def test_write_lock_excludes_read_locks(lock_file): +def test_write_lock_excludes_read_locks(lock_file) -> None: """Test that a write lock prevents other processes from acquiring read locks.""" # Create shared events write_acquired = mp.Event() release_write = mp.Event() read_acquired = mp.Event() read_started = mp.Event() # New event to signal when read attempt starts - + # Start process to acquire write lock - p1 = mp.Process(target=acquire_write_lock, - args=(lock_file, write_acquired, release_write)) - + p1 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired, release_write)) + # Start process to try to acquire read lock with no timeout # Use a ready_event to control when the read lock attempt should start - p2 = mp.Process(target=acquire_read_lock, - args=(lock_file, read_acquired, None, -1, True, read_started)) - + p2 = mp.Process(target=acquire_read_lock, args=(lock_file, read_acquired, None, -1, True, read_started)) + with cleanup_processes([p1, p2]): p1.start() assert write_acquired.wait(timeout=2), "Write lock not acquired" - + # Start the read process but don't signal it to begin acquiring yet p2.start() - + # Now signal p2 to attempt acquiring the read lock read_started.set() - + # Wait a short time - read lock should NOT be acquired while write lock is held time.sleep(2) assert not read_acquired.is_set(), "Read lock should not be acquired while write lock held" - + # Release write lock release_write.set() p1.join(timeout=2) - + # Now read process should be able to acquire the lock assert read_acquired.wait(timeout=2), "Read lock not acquired after write released" - + p2.join(timeout=2) assert not p2.is_alive(), "Process 2 did not exit cleanly" @pytest.mark.timeout(20) -def test_read_lock_excludes_write_locks(lock_file): +def test_read_lock_excludes_write_locks(lock_file) -> None: """Test that read locks prevent other processes from acquiring write locks.""" # Create shared events read_acquired = mp.Event() release_read = mp.Event() write_acquired = mp.Event() write_started = mp.Event() # New event to signal when write attempt starts - + # Start process to acquire read lock - p1 = mp.Process(target=acquire_read_lock, - args=(lock_file, read_acquired, release_read)) - + p1 = mp.Process(target=acquire_read_lock, args=(lock_file, read_acquired, release_read)) + # Start process to try to acquire write lock with no timeout # Use a ready_event to control when the write lock attempt should start - p2 = mp.Process(target=acquire_write_lock, - args=(lock_file, write_acquired, None, -1, True, write_started)) - + p2 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired, None, -1, True, write_started)) + with cleanup_processes([p1, p2]): p1.start() assert read_acquired.wait(timeout=2), "Read lock not acquired" - + # Start the write process but don't signal it to begin acquiring yet p2.start() - + # Now signal p2 to attempt acquiring the write lock write_started.set() - + # Wait a short time - write lock should NOT be acquired while read lock is held time.sleep(2) assert not write_acquired.is_set(), "Write lock should not be acquired while read lock held" - + # Release read lock release_read.set() p1.join(timeout=2) - + # Now write process should be able to acquire the lock assert write_acquired.wait(timeout=2), "Write lock not acquired after read released" - + p2.join(timeout=2) assert not p2.is_alive(), "Process 2 did not exit cleanly" # Move this function to module level (before the test functions) -def chain_reader(idx, lock_file, release_count, - forward_wait, backward_wait, forward_set, backward_set): +def chain_reader(idx, lock_file, release_count, forward_wait, backward_wait, forward_set, backward_set) -> None: # Wait for signal to start acquiring forward_wait.wait(timeout=10) - + try: lock = ReadWriteLock(lock_file) with lock.read_lock(): @@ -312,16 +299,16 @@ def chain_reader(idx, lock_file, release_count, # chain. time.sleep(2) - # Signal next reader to start if not the last one + # Signal next reader to start if not the last one if forward_set is not None: forward_set.set() - + if idx == 0: # Hold off releasing the write lock process (backward_set is writer_ready at idx=0) time.sleep(1) - + backward_set.set() - + # Wait for a signal from the next read to release, so that there is # always a read lock holding. Non-starvating write lock from another # process must make this backward_wait to timeout, actually. @@ -330,58 +317,55 @@ def chain_reader(idx, lock_file, release_count, # Increment the release counter before releasing with release_count.get_lock(): release_count.value += 1 - - except Exception as e: - print(f"Chain reader {idx} error: {e}") + + except Exception: + pass + @pytest.mark.timeout(40) -def test_write_non_starvation(lock_file): +def test_write_non_starvation(lock_file) -> None: """Test that write locks can eventually be acquired even with continuous read locks. - + Creates a chain of reader processes where the writer starts after the first reader - acquires a lock. The writer should be able to acquire its lock before the entire + acquires a lock. The writer should be able to acquire its lock before the entire reader chain has finished, demonstrating non-starvation. """ NUM_READERS = 7 - + # Create events for coordination chain_forward = [mp.Event() for _ in range(NUM_READERS)] # Signal to start acquiring chain_backward = [mp.Event() for _ in range(NUM_READERS)] # Signal to release writer_ready = mp.Event() writer_acquired = mp.Event() - + # Shared counter to track how many readers have released - release_count = mp.Value('i', 0) + release_count = mp.Value("i", 0) # Create reader processes readers = [] for i in range(NUM_READERS): - forward_set = chain_forward[i+1] if i < NUM_READERS - 1 else None - backward_set = chain_backward[i-1] if i > 0 else writer_ready + forward_set = chain_forward[i + 1] if i < NUM_READERS - 1 else None + backward_set = chain_backward[i - 1] if i > 0 else writer_ready reader = mp.Process( target=chain_reader, - args=(i, lock_file, release_count, chain_forward[i], chain_backward[i], - forward_set, backward_set) + args=(i, lock_file, release_count, chain_forward[i], chain_backward[i], forward_set, backward_set), ) readers.append(reader) - + # Create writer process that will try to acquire after first reader is established - writer = mp.Process( - target=acquire_write_lock, - args=(lock_file, writer_acquired, None, 20, True, writer_ready) - ) - - with cleanup_processes(readers + [writer]): + writer = mp.Process(target=acquire_write_lock, args=(lock_file, writer_acquired, None, 20, True, writer_ready)) + + with cleanup_processes([*readers, writer]): # Start all reader processes (they'll wait for their start signal) for reader in readers: reader.start() - + # Signal the first reader to start chain_forward[0].set() - + # Wait a bit for the first reader to acquire and signal the writer assert writer_ready.wait(timeout=10), "First reader did not acquire lock" - + # Start the writer process (it will wait for writer_ready event) writer.start() @@ -390,9 +374,7 @@ def test_write_non_starvation(lock_file): with release_count.get_lock(): read_releases = release_count.value - assert read_releases < 3, ( - f"Writer acquired after {read_releases} readers released - this indicates starvation" - ) + assert read_releases < 3, f"Writer acquired after {read_releases} readers released - this indicates starvation" # Wait for writer to finish writer.join(timeout=2) @@ -407,7 +389,7 @@ def test_write_non_starvation(lock_file): assert not reader.is_alive(), f"Reader {idx} did not exit cleanly" -def try_upgrade_lock(lock_file, read_acquired, upgrade_attempted, upgrade_result): +def try_upgrade_lock(lock_file, read_acquired, upgrade_attempted, upgrade_result) -> None: lock = ReadWriteLock(lock_file) try: with lock.read_lock(): @@ -418,15 +400,13 @@ def try_upgrade_lock(lock_file, read_acquired, upgrade_attempted, upgrade_result pass # Should not reach here except RuntimeError: upgrade_result.value = 0 # Expected behavior - except Exception as e: - print(f"Unexpected error during upgrade: {e}") + except Exception: upgrade_result.value = 2 - except Exception as e: - print(f"Unexpected error acquiring read lock: {e}") + except Exception: upgrade_result.value = 1 -def try_downgrade_lock(lock_file, write_acquired, downgrade_attempted, downgrade_result): +def try_downgrade_lock(lock_file, write_acquired, downgrade_attempted, downgrade_result) -> None: lock = ReadWriteLock(lock_file) try: with lock.write_lock(): @@ -437,177 +417,169 @@ def try_downgrade_lock(lock_file, write_acquired, downgrade_attempted, downgrade pass # Should not reach here except RuntimeError: downgrade_result.value = 0 # Expected behavior - except Exception as e: - print(f"Unexpected error during downgrade: {e}") + except Exception: downgrade_result.value = 2 - except Exception as e: - print(f"Unexpected error acquiring write lock: {e}") + except Exception: downgrade_result.value = 1 -def recursive_read_lock(lock_file, success_flag): +def recursive_read_lock(lock_file, success_flag) -> None: lock = ReadWriteLock(lock_file) try: with lock.read_lock(): # First acquisition assert lock._lock_level == 1 assert lock._current_mode == "read" - + with lock.read_lock(): # Second acquisition assert lock._lock_level == 2 assert lock._current_mode == "read" - + with lock.read_lock(): # Third acquisition assert lock._lock_level == 3 assert lock._current_mode == "read" - + # After third release assert lock._lock_level == 2 assert lock._current_mode == "read" - + # After second release assert lock._lock_level == 1 assert lock._current_mode == "read" - + # After first release assert lock._lock_level == 0 assert lock._current_mode is None - + success_flag.value = 1 - except Exception as e: - print(f"Recursive read lock error: {e}") + except Exception: success_flag.value = 0 @pytest.mark.timeout(10) -def test_recursive_read_lock_acquisition(lock_file): +def test_recursive_read_lock_acquisition(lock_file) -> None: """Test that the same process can acquire the same read lock multiple times.""" - success = mp.Value('i', 0) + success = mp.Value("i", 0) p = mp.Process(target=recursive_read_lock, args=(lock_file, success)) - + with cleanup_processes([p]): p.start() p.join(timeout=5) @pytest.mark.timeout(10) -def test_lock_upgrade_prohibited(lock_file): +def test_lock_upgrade_prohibited(lock_file) -> None: """Test that a process cannot upgrade from a read lock to a write lock.""" read_acquired = mp.Event() upgrade_attempted = mp.Event() - upgrade_result = mp.Value('i', -1) - - p = mp.Process(target=try_upgrade_lock, - args=(lock_file, read_acquired, upgrade_attempted, upgrade_result)) - + upgrade_result = mp.Value("i", -1) + + p = mp.Process(target=try_upgrade_lock, args=(lock_file, read_acquired, upgrade_attempted, upgrade_result)) + with cleanup_processes([p]): p.start() - + # Wait for read lock to be acquired assert read_acquired.wait(timeout=2), "Read lock not acquired" - + # Wait for upgrade to be attempted assert upgrade_attempted.wait(timeout=2), "Upgrade not attempted" - + # Wait for process to finish p.join(timeout=2) assert not p.is_alive(), "Process did not exit cleanly" - + # Verify result assert upgrade_result.value == 0, "Read lock was incorrectly upgraded to write lock" @pytest.mark.timeout(10) -def test_lock_downgrade_prohibited(lock_file): +def test_lock_downgrade_prohibited(lock_file) -> None: """Test that a process cannot downgrade from a write lock to a read lock.""" write_acquired = mp.Event() downgrade_attempted = mp.Event() - downgrade_result = mp.Value('i', -1) - - p = mp.Process(target=try_downgrade_lock, - args=(lock_file, write_acquired, downgrade_attempted, downgrade_result)) - + downgrade_result = mp.Value("i", -1) + + p = mp.Process(target=try_downgrade_lock, args=(lock_file, write_acquired, downgrade_attempted, downgrade_result)) + with cleanup_processes([p]): p.start() - + # Wait for write lock to be acquired assert write_acquired.wait(timeout=2), "Write lock not acquired" - + # Wait for downgrade to be attempted assert downgrade_attempted.wait(timeout=2), "Downgrade not attempted" - + # Wait for process to finish p.join(timeout=2) assert not p.is_alive(), "Process did not exit cleanly" - + # Verify result assert downgrade_result.value == 0, "Write lock was incorrectly downgraded to read lock" @pytest.mark.timeout(10) -def test_timeout_behavior(lock_file): +def test_timeout_behavior(lock_file) -> None: """Test that timeout parameter works correctly in multi-process environment.""" # Create shared events write_acquired = mp.Event() release_write = mp.Event() read_acquired = mp.Event() - + # Start process to acquire write lock and hold it - p1 = mp.Process(target=acquire_write_lock, - args=(lock_file, write_acquired, release_write)) - + p1 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired, release_write)) + # Start process to try to acquire read lock with timeout - p2 = mp.Process(target=acquire_read_lock, - args=(lock_file, read_acquired, None, 0.5, True)) - + p2 = mp.Process(target=acquire_read_lock, args=(lock_file, read_acquired, None, 0.5, True)) + with cleanup_processes([p1, p2]): p1.start() assert write_acquired.wait(timeout=2), "Write lock not acquired" - + # Start timer to measure timeout start_time = time.time() p2.start() - + # Process should not acquire read lock and should exit after timeout assert not read_acquired.wait(timeout=1), "Read lock should not be acquired" p2.join(timeout=5) # Allow more time for joining - + # Verify timeout duration was approximately correct # Make the timing constraints more lenient elapsed = time.time() - start_time assert 0.4 <= elapsed <= 10.0, f"Timeout was not respected: {elapsed}s" - + # Clean up release_write.set() p1.join(timeout=2) @pytest.mark.timeout(10) -def test_non_blocking_behavior(lock_file): +def test_non_blocking_behavior(lock_file) -> None: """Test that non-blocking parameter works correctly. - + This test directly attempts to acquire a read lock in non-blocking mode when a write lock is already held by another process. """ # Create shared events for the write lock write_acquired = mp.Event() release_write = mp.Event() - + # Start process to acquire write lock and hold it - p1 = mp.Process(target=acquire_write_lock, - args=(lock_file, write_acquired, release_write)) - + p1 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired, release_write)) + with cleanup_processes([p1]): p1.start() assert write_acquired.wait(timeout=2), "Write lock not acquired" - + lock = ReadWriteLock(lock_file) - + # Start timer to measure how quickly non-blocking returns start_time = time.time() - + # Attempt to acquire a read lock in non-blocking mode try: with lock.read_lock(blocking=False): @@ -616,118 +588,116 @@ def test_non_blocking_behavior(lock_file): except Timeout: # Expected behavior - lock acquisition should fail immediately pass - + elapsed = time.time() - start_time - + # Non-blocking should return very quickly assert elapsed < 0.1, f"Non-blocking took too long: {elapsed}s" - + # Clean up release_write.set() p1.join(timeout=2) # Move this function to module level (before the test functions) -def recursive_read_lock(lock_file, success_flag): +def recursive_read_lock(lock_file, success_flag) -> None: lock = ReadWriteLock(lock_file) try: with lock.read_lock(): # First acquisition assert lock._lock_level == 1 assert lock._current_mode == "read" - + with lock.read_lock(): # Second acquisition assert lock._lock_level == 2 assert lock._current_mode == "read" - + with lock.read_lock(): # Third acquisition assert lock._lock_level == 3 assert lock._current_mode == "read" - + # After third release assert lock._lock_level == 2 assert lock._current_mode == "read" - + # After second release assert lock._lock_level == 1 assert lock._current_mode == "read" - + # After first release assert lock._lock_level == 0 assert lock._current_mode is None - + success_flag.value = 1 - except Exception as e: - print(f"Recursive read lock error: {e}") + except Exception: success_flag.value = 0 @pytest.mark.timeout(10) -def test_recursive_read_lock_acquisition(lock_file): +def test_recursive_read_lock_acquisition(lock_file) -> None: """Test that the same process can acquire the same read lock multiple times.""" - success = mp.Value('i', 0) + success = mp.Value("i", 0) p = mp.Process(target=recursive_read_lock, args=(lock_file, success)) - + with cleanup_processes([p]): p.start() p.join(timeout=5) - + assert success.value == 1, "Recursive read lock acquisition failed" # Move this function to module level (before the test functions) -def recursive_write_lock(lock_file, success_flag): +def recursive_write_lock(lock_file, success_flag) -> None: lock = ReadWriteLock(lock_file) try: with lock.write_lock(): # First acquisition assert lock._lock_level == 1 assert lock._current_mode == "write" - + with lock.write_lock(): # Second acquisition assert lock._lock_level == 2 assert lock._current_mode == "write" - + with lock.write_lock(): # Third acquisition assert lock._lock_level == 3 assert lock._current_mode == "write" - + # After third release assert lock._lock_level == 2 assert lock._current_mode == "write" - + # After second release assert lock._lock_level == 1 assert lock._current_mode == "write" - + # After first release assert lock._lock_level == 0 assert lock._current_mode is None - + success_flag.value = 1 - except Exception as e: - print(f"Recursive write lock error: {e}") + except Exception: success_flag.value = 0 @pytest.mark.timeout(10) -def test_recursive_write_lock_acquisition(lock_file): +def test_recursive_write_lock_acquisition(lock_file) -> None: """Test that the same process can acquire the same write lock multiple times.""" - success = mp.Value('i', 0) + success = mp.Value("i", 0) p = mp.Process(target=recursive_write_lock, args=(lock_file, success)) - + with cleanup_processes([p]): p.start() p.join(timeout=5) - + assert success.value == 1, "Recursive write lock acquisition failed" -def acquire_write_lock_and_crash(lock_file, acquired_event): +def acquire_write_lock_and_crash(lock_file, acquired_event) -> None: lock = ReadWriteLock(lock_file) with lock.write_lock(): acquired_event.set() @@ -737,39 +707,39 @@ def acquire_write_lock_and_crash(lock_file, acquired_event): @pytest.mark.timeout(15) -def test_write_lock_release_on_process_termination(lock_file): +def test_write_lock_release_on_process_termination(lock_file) -> None: """Test that write locks are properly released if a process terminates.""" # Create shared events lock_acquired = mp.Event() - + # Start a process that will acquire the lock and then "crash" p1 = mp.Process(target=acquire_write_lock_and_crash, args=(lock_file, lock_acquired)) p1.start() - + # Wait for lock to be acquired assert lock_acquired.wait(timeout=2), "Lock not acquired by first process" - + # Create second process that will try to acquire the lock write_acquired = mp.Event() p2 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired)) - + with cleanup_processes([p1, p2]): # Terminate the first process (simulating a crash) time.sleep(0.5) # Ensure lock is fully acquired p1.terminate() p1.join(timeout=2) - + # Start second process - should be able to acquire the lock p2.start() - + # Check if second process can acquire the lock assert write_acquired.wait(timeout=5), "Lock not acquired after process termination" - + p2.join(timeout=2) assert not p2.is_alive(), "Second process did not exit cleanly" -def acquire_read_lock_and_crash(lock_file, acquired_event): +def acquire_read_lock_and_crash(lock_file, acquired_event) -> None: lock = ReadWriteLock(lock_file) with lock.read_lock(): acquired_event.set() @@ -779,44 +749,44 @@ def acquire_read_lock_and_crash(lock_file, acquired_event): @pytest.mark.timeout(15) -def test_read_lock_release_on_process_termination(lock_file): +def test_read_lock_release_on_process_termination(lock_file) -> None: """Test that readlocks are properly released if a process terminates.""" # Create shared events lock_acquired = mp.Event() - + # Start a process that will acquire the lock and then "crash" p1 = mp.Process(target=acquire_read_lock_and_crash, args=(lock_file, lock_acquired)) p1.start() - + # Wait for lock to be acquired assert lock_acquired.wait(timeout=2), "Lock not acquired by first process" - + # Create second process that will try to acquire the lock write_acquired = mp.Event() p2 = mp.Process(target=acquire_write_lock, args=(lock_file, write_acquired)) - + with cleanup_processes([p1, p2]): # Terminate the first process (simulating a crash) time.sleep(0.5) # Ensure lock is fully acquired p1.terminate() p1.join(timeout=2) - + # Start second process - should be able to acquire the lock p2.start() - + # Check if second process can acquire the lock assert write_acquired.wait(timeout=5), "Lock not acquired after process termination" - + p2.join(timeout=2) assert not p2.is_alive(), "Second process did not exit cleanly" @pytest.mark.timeout(15) -def test_single_read_lock_acquire_release(lock_file): +def test_single_read_lock_acquire_release(lock_file) -> None: """Test that a single read lock can be acquired and released.""" # Create a lock lock = ReadWriteLock(lock_file) - + # Acquire and release a read lock with lock.read_lock(): # Lock is acquired here @@ -824,7 +794,7 @@ def test_single_read_lock_acquire_release(lock_file): # Let's verify we can read the same lock again (read locks are reentrant) with lock.read_lock(): assert True, "Read lock acquired again" - + # Lock should be released here # We can test this by acquiring it again with lock.read_lock(): @@ -832,11 +802,11 @@ def test_single_read_lock_acquire_release(lock_file): @pytest.mark.timeout(15) -def test_single_write_lock_acquire_release(lock_file): +def test_single_write_lock_acquire_release(lock_file) -> None: """Test that a single write lock can be acquired and released.""" # Create a lock lock = ReadWriteLock(lock_file) - + # Acquire and release a write lock with lock.write_lock(): # Lock is acquired here @@ -844,7 +814,7 @@ def test_single_write_lock_acquire_release(lock_file): # Let's verify we can write lock again (write locks are reentrant) with lock.write_lock(): assert True, "Write lock acquired again" - + # Lock should be released here # We can test this by acquiring it again with lock.write_lock(): @@ -852,28 +822,28 @@ def test_single_write_lock_acquire_release(lock_file): @pytest.mark.timeout(15) -def test_read_then_write_lock(lock_file): +def test_read_then_write_lock(lock_file) -> None: """Test that we can acquire a read lock and then a write lock after releasing it.""" lock = ReadWriteLock(lock_file) - + # First acquire a read lock with lock.read_lock(): assert True, "Read lock acquired" - + # After releasing the read lock, we should be able to acquire a write lock with lock.write_lock(): assert True, "Write lock acquired after read lock released" @pytest.mark.timeout(15) -def test_write_then_read_lock(lock_file): +def test_write_then_read_lock(lock_file) -> None: """Test that we can acquire a write lock and then a read lock after releasing it.""" lock = ReadWriteLock(lock_file) - + # First acquire a write lock with lock.write_lock(): assert True, "Write lock acquired" - + # After releasing the write lock, we should be able to acquire a read lock with lock.read_lock(): assert True, "Read lock acquired after write lock released" @@ -881,4 +851,4 @@ def test_write_then_read_lock(lock_file): if __name__ == "__main__": # Set up multiprocessing to spawn instead of fork - mp.set_start_method('spawn') \ No newline at end of file + mp.set_start_method("spawn") From 0991a09167cbb676d33387fd31f33b2292a6c074 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Tue, 25 Mar 2025 16:38:25 +0300 Subject: [PATCH 6/7] Prohibit re-entering write lock from concurrent threads --- src/filelock/_read_write.py | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py index ea997bdb..b9a31fbf 100644 --- a/src/filelock/_read_write.py +++ b/src/filelock/_read_write.py @@ -29,8 +29,7 @@ def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) -> return _MAX_SQLITE_TIMEOUT_MS if timeout < 0: - msg = "timeout must be a non-negative number or -1" - raise ValueError(msg) + raise ValueError("timeout must be a non-negative number or -1") if timeout > 0: timeout -= already_waited @@ -110,6 +109,7 @@ def __init__( self._current_mode: Literal["read", "write"] | None = None # _lock_level is the reentrance counter. self._lock_level = 0 + self._write_thread_id: int | None = None self.con = sqlite3.connect(self.lock_file, check_same_thread=False) def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireReturnProxy: @@ -122,13 +122,10 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet if self._lock_level > 0: # Must already be in read mode. if self._current_mode != "read": - msg = ( + raise RuntimeError( f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " "already holding a write lock (downgrade not allowed)" ) - raise RuntimeError( - msg - ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -142,13 +139,10 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet with self._internal_lock: if self._lock_level > 0: if self._current_mode != "read": - msg = ( + raise RuntimeError( f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " "already holding a write lock (downgrade not allowed)" ) - raise RuntimeError( - msg - ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -206,12 +200,15 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - msg = ( + raise RuntimeError( f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " "already holding a read lock (upgrade not allowed)" ) + cur_thread_id = threading.get_ident() + if self._write_thread_id != cur_thread_id: raise RuntimeError( - msg + f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " + f"from thread {cur_thread_id} while it is held by thread {self._write_thread_id}" ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -226,13 +223,10 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - msg = ( + raise RuntimeError( f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " "already holding a read lock (upgrade not allowed)" ) - raise RuntimeError( - msg - ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -254,7 +248,8 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: self._current_mode = "write" self._lock_level = 1 - + self._write_thread_id = threading.get_ident() + return AcquireReturnProxy(lock=self) except sqlite3.OperationalError as e: @@ -278,6 +273,7 @@ def release(self, force: bool = False) -> None: if self._lock_level == 0: # Clear current mode and rollback the SQLite transaction. self._current_mode = None + self._write_thread_id = None # Unless there are bugs in this code, sqlite3.ProgrammingError # must not be raise here, that is, the transaction should have been # started in acquire_read() or acquire_write(). From 0b5e36f52a753ab176c03645d9a63d7bd3677897 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 25 Mar 2025 13:38:48 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/filelock/_read_write.py | 34 ++++++++++++++++++++++++++-------- tests/test_read_write.py | 8 ++++++-- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/filelock/_read_write.py b/src/filelock/_read_write.py index b9a31fbf..a3ce0ca0 100644 --- a/src/filelock/_read_write.py +++ b/src/filelock/_read_write.py @@ -29,7 +29,8 @@ def timeout_for_sqlite(timeout: float, blocking: bool, already_waited: float) -> return _MAX_SQLITE_TIMEOUT_MS if timeout < 0: - raise ValueError("timeout must be a non-negative number or -1") + msg = "timeout must be a non-negative number or -1" + raise ValueError(msg) if timeout > 0: timeout -= already_waited @@ -79,7 +80,9 @@ def get_lock(cls, lock_file: str | os.PathLike[str], timeout: float = -1, blocki instance = cls._instances[normalized] if instance.timeout != timeout or instance.blocking != blocking: - msg = "Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s" + msg = ( + "Singleton lock created with timeout=%s, blocking=%s, cannot be changed to timeout=%s, blocking=%s" + ) raise ValueError( msg, instance.timeout, @@ -122,10 +125,13 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet if self._lock_level > 0: # Must already be in read mode. if self._current_mode != "read": - raise RuntimeError( + msg = ( f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " "already holding a write lock (downgrade not allowed)" ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -139,10 +145,13 @@ def acquire_read(self, timeout: float = -1, blocking: bool = True) -> AcquireRet with self._internal_lock: if self._lock_level > 0: if self._current_mode != "read": - raise RuntimeError( + msg = ( f"Cannot acquire read lock on {self.lock_file} (lock id: {id(self)}): " "already holding a write lock (downgrade not allowed)" ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -200,16 +209,22 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError( + msg = ( f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " "already holding a read lock (upgrade not allowed)" ) + raise RuntimeError( + msg + ) cur_thread_id = threading.get_ident() if self._write_thread_id != cur_thread_id: - raise RuntimeError( + msg = ( f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}) " f"from thread {cur_thread_id} while it is held by thread {self._write_thread_id}" ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -223,10 +238,13 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe with self._internal_lock: if self._lock_level > 0: if self._current_mode != "write": - raise RuntimeError( + msg = ( f"Cannot acquire write lock on {self.lock_file} (lock id: {id(self)}): " "already holding a read lock (upgrade not allowed)" ) + raise RuntimeError( + msg + ) self._lock_level += 1 return AcquireReturnProxy(lock=self) @@ -249,7 +267,7 @@ def acquire_write(self, timeout: float = -1, blocking: bool = True) -> AcquireRe self._current_mode = "write" self._lock_level = 1 self._write_thread_id = threading.get_ident() - + return AcquireReturnProxy(lock=self) except sqlite3.OperationalError as e: diff --git a/tests/test_read_write.py b/tests/test_read_write.py index 7d194a14..b45edd83 100644 --- a/tests/test_read_write.py +++ b/tests/test_read_write.py @@ -11,7 +11,9 @@ # Helper function to run in a separate process to acquire a read lock -def acquire_read_lock(lock_file, acquired_event, release_event=None, timeout=-1, blocking=True, ready_event=None) -> bool | None: +def acquire_read_lock( + lock_file, acquired_event, release_event=None, timeout=-1, blocking=True, ready_event=None +) -> bool | None: # Get error queue from current process if available current_process = mp.current_process() error_queue = getattr(current_process, "_error_queue", None) @@ -40,7 +42,9 @@ def acquire_read_lock(lock_file, acquired_event, release_event=None, timeout=-1, # Helper function to run in a separate process to acquire a write lock -def acquire_write_lock(lock_file, acquired_event, release_event=None, timeout=-1, blocking=True, ready_event=None) -> bool | None: +def acquire_write_lock( + lock_file, acquired_event, release_event=None, timeout=-1, blocking=True, ready_event=None +) -> bool | None: if ready_event: ready_event.wait(timeout=10)