|
| 1 | +use chrono::{DateTime, Utc}; |
| 2 | +use sqlx::Postgres; |
| 3 | +use tracing::{info, warn}; |
| 4 | +use uuid::Uuid; |
| 5 | + |
| 6 | +/// Implements a non-blocking, distributed locking mechanism |
| 7 | +/// that coordinates dependence-chain processing across multiple workers |
| 8 | +pub struct LockMngr { |
| 9 | + pool: sqlx::Pool<Postgres>, |
| 10 | + worker_id: Uuid, |
| 11 | + lock: Option<DatabaseChainLock>, |
| 12 | +} |
| 13 | + |
| 14 | +/// Dependence chain lock data |
| 15 | +#[derive(Debug, sqlx::FromRow, Clone)] |
| 16 | +pub struct DatabaseChainLock { |
| 17 | + pub dependence_chain_id: Vec<u8>, |
| 18 | + pub worker_id: Option<Uuid>, |
| 19 | + pub lock_acquired_at: Option<DateTime<Utc>>, |
| 20 | + pub lock_expires_at: Option<DateTime<Utc>>, |
| 21 | + pub last_updated_at: DateTime<Utc>, |
| 22 | +} |
| 23 | + |
| 24 | +impl LockMngr { |
| 25 | + pub fn new(worker_id: Uuid, pool: sqlx::Pool<Postgres>) -> Self { |
| 26 | + Self { |
| 27 | + worker_id, |
| 28 | + pool, |
| 29 | + lock: None, |
| 30 | + } |
| 31 | + } |
| 32 | + |
| 33 | + /// Acquire the next available dependence-chain entry for processing |
| 34 | + /// sorted by last_updated_at (FIFO). |
| 35 | + /// Returns the dependence_chain_id if a lock was acquired |
| 36 | + pub async fn acquire_next_lock(&mut self) -> Result<Option<Vec<u8>>, sqlx::Error> { |
| 37 | + let row = sqlx::query_as::<_, DatabaseChainLock>( |
| 38 | + r#" |
| 39 | + WITH c AS ( |
| 40 | + SELECT dependence_chain_id |
| 41 | + FROM dependence_chain |
| 42 | + WHERE |
| 43 | + status = 'updated' -- Marked as updated by host-listener |
| 44 | + AND |
| 45 | + (worker_id IS NULL -- Ensure no other workers own it |
| 46 | + OR lock_expires_at < NOW()) -- Work-stealing of expired locks |
| 47 | + ORDER BY last_updated_at ASC -- FIFO |
| 48 | + FOR UPDATE SKIP LOCKED -- Ensure no other worker is currently trying to lock it |
| 49 | + LIMIT 1 |
| 50 | + ) |
| 51 | + UPDATE dependence_chain AS dc |
| 52 | + SET |
| 53 | + worker_id = $1, |
| 54 | + status = 'processing', |
| 55 | + lock_acquired_at = NOW(), |
| 56 | + lock_expires_at = NOW() + INTERVAL '30 seconds' |
| 57 | + FROM c |
| 58 | + WHERE dc.dependence_chain_id = c.dependence_chain_id |
| 59 | + RETURNING dc.*; |
| 60 | + "#, |
| 61 | + ) |
| 62 | + .bind(self.worker_id.to_string()) |
| 63 | + .fetch_optional(&self.pool) |
| 64 | + .await?; |
| 65 | + |
| 66 | + let row = if let Some(row) = row { |
| 67 | + row |
| 68 | + } else { |
| 69 | + return Ok(None); |
| 70 | + }; |
| 71 | + |
| 72 | + self.lock.replace(row.clone()); |
| 73 | + |
| 74 | + info!(target: "deps_chain", ?row, "Acquired lock"); |
| 75 | + |
| 76 | + Ok(Some(row.dependence_chain_id)) |
| 77 | + } |
| 78 | + |
| 79 | + /// Release all locks held by this worker |
| 80 | + /// |
| 81 | + /// If host-listener has marked the dependence chain as 'updated' in the meantime, |
| 82 | + /// we don't overwrite its status |
| 83 | + pub async fn release_all_owned_locks(&self) -> Result<u64, sqlx::Error> { |
| 84 | + // Since UPDATE always aquire a row-level lock internally, |
| 85 | + // this acts as atomic_exchange |
| 86 | + let rows = sqlx::query!( |
| 87 | + r#" |
| 88 | + UPDATE dependence_chain |
| 89 | + SET |
| 90 | + worker_id = NULL, |
| 91 | + lock_acquired_at = NULL, |
| 92 | + lock_expires_at = NULL, |
| 93 | + status = CASE |
| 94 | + WHEN status = 'processing' THEN 'processed' |
| 95 | + ELSE status |
| 96 | + END |
| 97 | + WHERE worker_id = $1 |
| 98 | + "#, |
| 99 | + self.worker_id |
| 100 | + ) |
| 101 | + .execute(&self.pool) |
| 102 | + .await?; |
| 103 | + |
| 104 | + info!(target: "deps_chain", worker_id = %self.worker_id, |
| 105 | + count = rows.rows_affected(), "Released all locks"); |
| 106 | + |
| 107 | + Ok(rows.rows_affected()) |
| 108 | + } |
| 109 | + |
| 110 | + /// Release the lock held by this worker on the current dependence chain |
| 111 | + /// If host-listener has marked the dependence chain as 'updated' in the meantime, |
| 112 | + /// we don't overwrite its status |
| 113 | + pub async fn release_current_lock(&self) -> Result<u64, sqlx::Error> { |
| 114 | + let dep_chain_id = match &self.lock { |
| 115 | + Some(lock) => lock.dependence_chain_id.clone(), |
| 116 | + None => { |
| 117 | + warn!(target: "deps_chain", "No lock to release"); |
| 118 | + return Ok(0); |
| 119 | + } |
| 120 | + }; |
| 121 | + |
| 122 | + let rows = sqlx::query!( |
| 123 | + r#" |
| 124 | + UPDATE dependence_chain |
| 125 | + SET |
| 126 | + worker_id = NULL, |
| 127 | + lock_acquired_at = NULL, |
| 128 | + lock_expires_at = NULL, |
| 129 | + status = CASE |
| 130 | + WHEN status = 'processing' THEN 'processed' |
| 131 | + ELSE status |
| 132 | + END |
| 133 | + WHERE worker_id = $1 AND dependence_chain_id = $2 |
| 134 | + "#, |
| 135 | + self.worker_id, |
| 136 | + dep_chain_id, |
| 137 | + ) |
| 138 | + .execute(&self.pool) |
| 139 | + .await?; |
| 140 | + |
| 141 | + info!(target: "deps_chain", ?dep_chain_id, "Released lock"); |
| 142 | + |
| 143 | + Ok(rows.rows_affected()) |
| 144 | + } |
| 145 | + |
| 146 | + /// Set error on the current dependence chain |
| 147 | + /// If host-listener has marked the dependence chain as 'updated' in the meantime, |
| 148 | + /// we don't overwrite its error |
| 149 | + /// |
| 150 | + /// The error is only informational and does not affect the processing status |
| 151 | + pub async fn set_processing_error(&self, err: Option<String>) -> Result<u64, sqlx::Error> { |
| 152 | + let dep_chain_id: Vec<u8> = match &self.lock { |
| 153 | + Some(lock) => lock.dependence_chain_id.clone(), |
| 154 | + None => { |
| 155 | + warn!(target: "deps_chain", "No lock to set error on"); |
| 156 | + return Ok(0); |
| 157 | + } |
| 158 | + }; |
| 159 | + |
| 160 | + let rows = sqlx::query!( |
| 161 | + r#" |
| 162 | + UPDATE dependence_chain |
| 163 | + SET |
| 164 | + error_message = CASE |
| 165 | + WHEN status = 'processing' THEN $3 |
| 166 | + ELSE error_message |
| 167 | + END |
| 168 | + WHERE worker_id = $1 AND dependence_chain_id = $2 |
| 169 | + "#, |
| 170 | + self.worker_id, |
| 171 | + dep_chain_id, |
| 172 | + err |
| 173 | + ) |
| 174 | + .execute(&self.pool) |
| 175 | + .await?; |
| 176 | + |
| 177 | + info!(target: "deps_chain", ?dep_chain_id, error = ?err, "Set error on lock"); |
| 178 | + Ok(rows.rows_affected()) |
| 179 | + } |
| 180 | + |
| 181 | + /// Extend the lock expiration time on the current dependence chain |
| 182 | + pub async fn extend_current_lock(&self) -> Result<(), sqlx::Error> { |
| 183 | + let dependence_chain_id = match &self.lock { |
| 184 | + Some(lock) => lock.dependence_chain_id.clone(), |
| 185 | + None => { |
| 186 | + info!(target: "deps_chain", "No lock to extend"); |
| 187 | + return Ok(()); |
| 188 | + } |
| 189 | + }; |
| 190 | + |
| 191 | + sqlx::query!( |
| 192 | + r#" |
| 193 | + UPDATE dependence_chain |
| 194 | + SET |
| 195 | + lock_expires_at = NOW() + INTERVAL '30 seconds' |
| 196 | + WHERE dependence_chain_id = $1 AND worker_id = $2 |
| 197 | + "#, |
| 198 | + dependence_chain_id, |
| 199 | + self.worker_id |
| 200 | + ) |
| 201 | + .execute(&self.pool) |
| 202 | + .await?; |
| 203 | + |
| 204 | + info!(target: "deps_chain", ?dependence_chain_id, "Extended lock"); |
| 205 | + |
| 206 | + Ok(()) |
| 207 | + } |
| 208 | +} |
0 commit comments