|
| 1 | +<?php |
| 2 | + |
| 3 | +declare(strict_types=1); |
| 4 | + |
| 5 | +namespace Bavix\Wallet\Internal\Service; |
| 6 | + |
| 7 | +use Bavix\Wallet\Internal\Exceptions\ExceptionInterface; |
| 8 | +use Bavix\Wallet\Internal\Exceptions\ModelNotFoundException; |
| 9 | +use Bavix\Wallet\Internal\Exceptions\TransactionFailedException; |
| 10 | +use Bavix\Wallet\Models\Wallet; |
| 11 | +use Illuminate\Contracts\Cache\Factory as CacheFactory; |
| 12 | +use Illuminate\Contracts\Cache\Repository as CacheRepository; |
| 13 | +use Illuminate\Database\QueryException; |
| 14 | + |
| 15 | +final class PostgresLockService implements LockServiceInterface |
| 16 | +{ |
| 17 | + private const string LOCK_KEY = 'wallet_lock::'; |
| 18 | + |
| 19 | + private const string INNER_KEYS = 'inner_keys::'; |
| 20 | + |
| 21 | + private readonly CacheRepository $lockedKeys; |
| 22 | + |
| 23 | + public function __construct( |
| 24 | + private readonly ConnectionServiceInterface $connectionService, |
| 25 | + private readonly StorageServiceInterface $storageService, |
| 26 | + CacheFactory $cacheFactory, |
| 27 | + private readonly int $seconds |
| 28 | + ) { |
| 29 | + $this->lockedKeys = $cacheFactory->store('array'); |
| 30 | + } |
| 31 | + |
| 32 | + public function block(string $key, callable $callback): mixed |
| 33 | + { |
| 34 | + // Delegate to blocks() with single element array |
| 35 | + return $this->blocks([$key], $callback); |
| 36 | + } |
| 37 | + |
| 38 | + public function blocks(array $keys, callable $callback): mixed |
| 39 | + { |
| 40 | + // Filter out already blocked keys |
| 41 | + $keysToLock = []; |
| 42 | + foreach ($keys as $key) { |
| 43 | + if (! $this->isBlocked($key)) { |
| 44 | + $keysToLock[] = $key; |
| 45 | + } |
| 46 | + } |
| 47 | + |
| 48 | + // If all keys are already blocked, just execute callback |
| 49 | + if ($keysToLock === []) { |
| 50 | + return $callback(); |
| 51 | + } |
| 52 | + |
| 53 | + // Sort keys to prevent deadlock |
| 54 | + $sortedKeys = $this->sortKeys($keysToLock); |
| 55 | + |
| 56 | + // Normalize keys to UUIDs immediately |
| 57 | + // Keys can be in two formats: |
| 58 | + // 1. "wallet_lock::uuid" - full format (from AtomicService, tests) |
| 59 | + // 2. "uuid" - just UUID (from BookkeeperService::multiAmount) |
| 60 | + // 3. Non-UUID keys (e.g., from LockServiceTest using __METHOD__) |
| 61 | + $uuids = []; |
| 62 | + $nonUuidKeys = []; |
| 63 | + |
| 64 | + foreach ($sortedKeys as $key) { |
| 65 | + // Extract UUID: remove prefix if present, otherwise key is UUID |
| 66 | + $uuid = str_starts_with($key, self::LOCK_KEY) |
| 67 | + ? str_replace(self::LOCK_KEY, '', $key) |
| 68 | + : $key; |
| 69 | + |
| 70 | + if ($uuid === '') { |
| 71 | + continue; |
| 72 | + } |
| 73 | + |
| 74 | + // Simple check: UUID format is 36 chars with dashes (8-4-4-4-12) |
| 75 | + // This is a lightweight check without full validation |
| 76 | + if (strlen($uuid) === 36 && substr_count($uuid, '-') === 4) { |
| 77 | + $uuids[] = $uuid; |
| 78 | + } else { |
| 79 | + $nonUuidKeys[] = $key; |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + // Handle non-UUID keys: mark as blocked and execute callback without DB query |
| 84 | + foreach ($nonUuidKeys as $key) { |
| 85 | + $this->lockedKeys->put(self::INNER_KEYS.$key, true, $this->seconds); |
| 86 | + } |
| 87 | + |
| 88 | + $connection = $this->connectionService->get(); |
| 89 | + $inTransaction = $connection->transactionLevel() > 0; |
| 90 | + |
| 91 | + // If no UUIDs found, just execute callback |
| 92 | + // For non-UUID keys inside transaction: keep locked until releases() (like UUID keys) |
| 93 | + // For non-UUID keys outside transaction: clear in finally block |
| 94 | + if ($uuids === []) { |
| 95 | + if ($inTransaction) { |
| 96 | + // Inside transaction: keep locked until releases() is called |
| 97 | + return $callback(); |
| 98 | + } |
| 99 | + |
| 100 | + // Outside transaction: clear after callback |
| 101 | + try { |
| 102 | + return $callback(); |
| 103 | + } finally { |
| 104 | + // Clear non-UUID keys after callback (similar to UUID keys in finally block) |
| 105 | + foreach ($nonUuidKeys as $key) { |
| 106 | + $this->lockedKeys->delete(self::INNER_KEYS.$key); |
| 107 | + } |
| 108 | + } |
| 109 | + } |
| 110 | + |
| 111 | + if ($inTransaction) { |
| 112 | + // ⚠️ CRITICAL: We are already inside a transaction! |
| 113 | + // |
| 114 | + // This happens in the following scenarios: |
| 115 | + // 1. User created transaction manually (DB::beginTransaction()) |
| 116 | + // 2. AtomicService::blocks() created transaction via databaseService->transaction() |
| 117 | + // 3. BookkeeperService::multiAmount() called inside transaction and automatically locks wallet |
| 118 | + // when record is not found in cache (RecordNotFoundException) |
| 119 | + // |
| 120 | + // AUTOMATIC LOCKING: |
| 121 | + // - When user accesses $wallet->balanceInt inside transaction, |
| 122 | + // this calls RegulatorService::amount() -> BookkeeperService::amount() -> multiAmount() |
| 123 | + // - If record is not found in cache, BookkeeperService automatically calls |
| 124 | + // lockService->blocks() to lock the wallet |
| 125 | + // - This means lock can be called INSIDE an existing transaction |
| 126 | + // |
| 127 | + // In this case: |
| 128 | + // - DO NOT create new transaction (we are already inside existing one) |
| 129 | + // - Just set FOR UPDATE lock on existing transaction |
| 130 | + // - Lock will be released automatically by PostgreSQL on commit/rollback |
| 131 | + // - lockedKeys will be cleared via releases() after TransactionCommitted/RolledBack event |
| 132 | + // - If wallets are already locked in this transaction, PostgreSQL will return them anyway |
| 133 | + // (FOR UPDATE on already locked row in same transaction is safe and returns the row) |
| 134 | + $this->lockWallets($uuids); |
| 135 | + |
| 136 | + return $callback(); |
| 137 | + } |
| 138 | + |
| 139 | + // PostgresLockService creates transaction |
| 140 | + // Clear lockedKeys after transaction completes to prevent accumulation in Octane |
| 141 | + try { |
| 142 | + return $connection->transaction(function () use ($uuids, $callback) { |
| 143 | + $this->lockWallets($uuids); |
| 144 | + |
| 145 | + return $callback(); |
| 146 | + }); |
| 147 | + } finally { |
| 148 | + // CRITICAL for Octane: clear lockedKeys after transaction completes |
| 149 | + // This prevents accumulation in long-lived processes |
| 150 | + foreach ($uuids as $uuid) { |
| 151 | + $this->lockedKeys->delete(self::INNER_KEYS.$uuid); |
| 152 | + } |
| 153 | + } |
| 154 | + } |
| 155 | + |
| 156 | + public function releases(array $keys): void |
| 157 | + { |
| 158 | + // Called from RegulatorService::purge() after TransactionCommitted/RolledBack |
| 159 | + foreach ($keys as $key) { |
| 160 | + // Normalize key to UUID (we store only UUIDs, not original key format) |
| 161 | + $uuid = str_starts_with($key, self::LOCK_KEY) |
| 162 | + ? str_replace(self::LOCK_KEY, '', $key) |
| 163 | + : $key; |
| 164 | + |
| 165 | + if ($uuid !== '' && $this->lockedKeys->get(self::INNER_KEYS.$uuid) === true) { |
| 166 | + // Clear lockedKeys - DB locks already released by PostgreSQL |
| 167 | + $this->lockedKeys->delete(self::INNER_KEYS.$uuid); |
| 168 | + } |
| 169 | + } |
| 170 | + } |
| 171 | + |
| 172 | + public function isBlocked(string $key): bool |
| 173 | + { |
| 174 | + // Normalize key to UUID (we store only UUIDs, not original key format) |
| 175 | + $uuid = str_starts_with($key, self::LOCK_KEY) |
| 176 | + ? str_replace(self::LOCK_KEY, '', $key) |
| 177 | + : $key; |
| 178 | + |
| 179 | + if ($uuid === '') { |
| 180 | + return false; |
| 181 | + } |
| 182 | + |
| 183 | + return $this->lockedKeys->get(self::INNER_KEYS.$uuid) === true; |
| 184 | + } |
| 185 | + |
| 186 | + /** |
| 187 | + * Lock multiple wallets with FOR UPDATE and sync their balances to cache. |
| 188 | + * |
| 189 | + * CRITICAL: This method MUST read balance from DB before locking and sync it to state transaction. |
| 190 | + * The balance is read with FOR UPDATE lock, then synced to StorageService (which uses array cache |
| 191 | + * when PostgresLockService is active). This ensures balance is always fresh from DB within transaction. |
| 192 | + * |
| 193 | + * Optimized: single query for all wallets, single multiSync, single multiGet for verification. |
| 194 | + * |
| 195 | + * @param string[] $uuids Array of normalized UUIDs (already normalized, no prefix) |
| 196 | + */ |
| 197 | + private function lockWallets(array $uuids): void |
| 198 | + { |
| 199 | + if ($uuids === []) { |
| 200 | + return; |
| 201 | + } |
| 202 | + |
| 203 | + // CRITICAL: Read balance from DB with FOR UPDATE lock BEFORE syncing to state transaction |
| 204 | + // This ensures we always have the latest balance from database, not from external cache |
| 205 | + // OPTIMIZATION: Single query to lock all wallets at once |
| 206 | + // SELECT * FROM wallets WHERE uuid IN (?, ?, ...) FOR UPDATE |
| 207 | + try { |
| 208 | + $wallets = Wallet::query() |
| 209 | + ->whereIn('uuid', $uuids) |
| 210 | + ->lockForUpdate() |
| 211 | + ->get() |
| 212 | + ->keyBy('uuid'); |
| 213 | + } catch (QueryException $e) { |
| 214 | + // PostgreSQL throws QueryException for invalid UUID format or other database errors |
| 215 | + // Convert to ModelNotFoundException for consistency |
| 216 | + throw new ModelNotFoundException( |
| 217 | + 'Invalid wallet UUID or wallet not found: '.implode(', ', $uuids), |
| 218 | + ExceptionInterface::MODEL_NOT_FOUND, |
| 219 | + $e |
| 220 | + ); |
| 221 | + } |
| 222 | + |
| 223 | + // Extract balances from locked wallets (fresh from DB, not from cache) |
| 224 | + // For wallets not found in DB (lazy creation), use balance 0 |
| 225 | + $balances = []; |
| 226 | + foreach ($uuids as $uuid) { |
| 227 | + $wallet = $wallets->get($uuid); |
| 228 | + if ($wallet !== null) { |
| 229 | + // Wallet exists in DB - use balance from DB |
| 230 | + $balances[$uuid] = $wallet->getOriginalBalanceAttribute(); |
| 231 | + } else { |
| 232 | + // Wallet doesn't exist in DB yet (lazy creation) - use balance 0 |
| 233 | + // This is normal for new wallets that haven't been saved yet |
| 234 | + $balances[$uuid] = '0'; |
| 235 | + } |
| 236 | + } |
| 237 | + |
| 238 | + // Mark all UUIDs as locked (store only UUID, already normalized) |
| 239 | + foreach ($uuids as $uuid) { |
| 240 | + $this->lockedKeys->put(self::INNER_KEYS.$uuid, true, $this->seconds); |
| 241 | + } |
| 242 | + |
| 243 | + // CRITICAL: Sync balances to StorageService (state transaction) |
| 244 | + // StorageService uses array cache when PostgresLockService is active, |
| 245 | + // ensuring balance is stored in-memory for the transaction |
| 246 | + // OPTIMIZATION: Single multiSync for all balances |
| 247 | + $this->storageService->multiSync($balances); |
| 248 | + |
| 249 | + // OPTIMIZATION: Single multiGet to verify all balances at once |
| 250 | + $cachedBalances = $this->storageService->multiGet($uuids); |
| 251 | + |
| 252 | + // CRITICAL CHECK: Verify cache sync for all wallets |
| 253 | + foreach ($uuids as $uuid) { |
| 254 | + $expectedBalance = $balances[$uuid]; |
| 255 | + $cachedBalance = $cachedBalances[$uuid] ?? null; |
| 256 | + |
| 257 | + if ($cachedBalance !== $expectedBalance) { |
| 258 | + throw new TransactionFailedException( |
| 259 | + "CRITICAL: Cache sync failed for wallet {$uuid}. ". |
| 260 | + "Expected: {$expectedBalance}, Got: {$cachedBalance}. ". |
| 261 | + 'This may cause financial inconsistencies!', |
| 262 | + ExceptionInterface::TRANSACTION_FAILED |
| 263 | + ); |
| 264 | + } |
| 265 | + } |
| 266 | + } |
| 267 | + |
| 268 | + private function sortKeys(array $keys): array |
| 269 | + { |
| 270 | + // Sort to prevent deadlock |
| 271 | + $sorted = $keys; |
| 272 | + sort($sorted); |
| 273 | + |
| 274 | + return $sorted; |
| 275 | + } |
| 276 | +} |
0 commit comments