From aa85b5d8b0e6c36785c74d9d31635fd62588c6c0 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 17 Feb 2024 05:23:36 -0800 Subject: [PATCH] Starting the scaffolding of the redis-based locking mechanism --- Cargo.toml | 18 +++++--- docker-compose.yml | 22 ++++++++++ src/dynamodb.rs | 5 +-- src/lib.rs | 101 ++++++++++++++++++++++++++++++++++++--------- src/redis.rs | 87 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 204 insertions(+), 29 deletions(-) create mode 100644 docker-compose.yml create mode 100644 src/redis.rs diff --git a/Cargo.toml b/Cargo.toml index 597b70f..2de98a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,26 +6,32 @@ edition = "2021" [dependencies] async-trait = "^0.1.68" bytes = "*" -object_store = "^0.5.0" +object_store = "0.9.0" futures = "*" thiserror = "1" tokio = { version = "^1.25.0", default-features = false} +tracing = { version = "0.1.40", features = ["log"] } serde = { version = "1", features = [ "derive" ]} +url = { version = "2.5.0", features = ["serde"] } # dynamodb feature -#dynamodb_lock = { version = "^0.5.0", optional = true } -dynamodb_lock = { git = "https://github.com/delta-incubator/dynamodb-lock-rs", branch = "web-identity", optional = true } +dynamodb_lock = { version = "^0.6.1", optional = true } # postres feature sqlx = {version = "^0.6.3", default-features = false, optional = true } -[dev-dependencies] -object_store = { version = "^0.5.0", features = ["aws"]} +# redis features +rslock = { version = "0.3.0", default-features = false, features = ["tokio-comp"], optional = true} [features] -default = [] +default = ["redis", "integration-test"] +# Integration test is a feature that just requires docker-compose to be running +integration-test = [] dynamodb = ["dynamodb_lock"] postgres = ["sqlx/postgres"] +redis = [ + "dep:rslock", +] tokio-native-tls = ["sqlx/runtime-tokio-native-tls"] tokio-rustls = ["sqlx/runtime-tokio-rustls"] actix-native-tls = ["sqlx/runtime-actix-native-tls"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a8d8abf --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,22 @@ +version: "3.9" +services: + redis: + image: redis + ports: + - 6380:6380 + localstack: + image: localstack/localstack:0.14 + ports: + - 4566:4566 + - 8080:8080 + environment: + - SERVICES=s3,dynamodb + - DEBUG=1 + - DATA_DIR=/tmp/localstack/data + - PORT_WEB_UI=8080 + - DOCKER_HOST=unix:///var/run/docker.sock + - HOST_TMP_FOLDER=${TMPDIR} + - AWS_ACCESS_KEY_ID=deltalake + - AWS_SECRET_ACCESS_KEY=weloverust + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:4566/health" ] diff --git a/src/dynamodb.rs b/src/dynamodb.rs index 3180797..51f9f40 100644 --- a/src/dynamodb.rs +++ b/src/dynamodb.rs @@ -1,6 +1,5 @@ -/* - * This module defines the DynamoDB backed object_store - */ +//! +//! This module defines the DynamoDB backed object_store use dynamodb_lock::DynamoDbLockClient; pub use dynamodb_lock::DynamoDbOptions; diff --git a/src/lib.rs b/src/lib.rs index 6056ac4..d4bdb2d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,20 +5,44 @@ mod errors; pub mod dynamodb; #[cfg(feature = "postgres")] pub mod postgres; +#[cfg(feature = "redis")] +pub mod redis; use crate::errors::LockedObjectStoreError; -use async_trait::async_trait; +use object_store::{GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result}; use serde::{Deserialize, Serialize}; +use std::env; use std::fmt::Debug; +use std::sync::Arc; + +/// +/// The `LockedObjectStore` provides the wrapper needed to lock on specific [ObjectStore] +/// operations +/// +#[derive(Clone, Debug)] +pub struct LockedObjectStore { + inner: Arc, + lock: Arc>, +} + +/// +/// Definitions of operations on [LockedObjectStore] Which can be locked +/// +#[derive(Debug)] +pub enum LockOperations { + Copy, + CopyIfNotExists, + Delete, + Put, + Rename, + RenameIfNotExists, +} /// A lock that has been successfully acquired #[derive(Clone, Debug, Deserialize, Serialize)] pub struct LockItem { /// The name of the owner that owns this lock. pub owner_name: String, - /// Current version number of the lock in DynamoDB. This is what tells the lock client - /// when the lock is stale. - pub record_version_number: String, /// The amount of time (in seconds) that the owner has this lock for. /// If lease_duration is None then the lock is non-expirable. pub lease_duration: Option, @@ -34,40 +58,77 @@ pub struct LockItem { pub is_non_acquirable: bool, } +impl Default for LockItem { + fn default() -> Self { + let owner_name = env::var("LOCK_OWNER_NAME").unwrap_or("locking-object-store".into()); + let lease_duration = env::var("LOCK_LEASE_SECONDS").unwrap_or("20".into()); + + Self { + owner_name, + acquired_expired_lock: false, + is_non_acquirable: false, + is_released: false, + // TODO bring lease_duration through + lease_duration: None, + data: None, + lookup_time: 0, + } + } +} + +impl LockItem { + /// + /// Create a standard [LockItem] with default parameters set by environment variables + fn from(data: T) -> Self { + let mut item = LockItem::default(); + item.data = Some(data); + item + } +} + /// Abstraction over a distributive lock provider -#[async_trait] -pub trait LockClient: Send + Sync + Debug { +#[async_trait::async_trait] +pub trait LockClient: Send + Sync + Debug { /// Attempts to acquire lock for data. If successful, returns the lock. /// Otherwise returns [`Option::None`] which is retryable action. /// Visit implementation docs for more details. - async fn try_acquire_lock( + async fn try_acquire_lock( &self, data: T, - ) -> Result>, LockedObjectStoreError>; + ) -> Result>, LockedObjectStoreError> + where + T: 'async_trait; /// Returns current lock for data (if any). // the original implementation of this was returning the top lock in the system, since our lock // is unique based on data, we require that same piece of data in order to return the current lock // again though, this is just lock information - it does not mean you're safe to act on the lock - async fn get_lock( - &self, - data: T, - ) -> Result>, LockedObjectStoreError>; + async fn get_lock(&self, data: T) -> Result>, LockedObjectStoreError> + where + T: 'async_trait; /// Update data in the upstream lock of the current user still has it. /// The returned lock will have a new `rvn` so it'll increase the lease duration /// as this method is usually called when the work with a lock is extended. - async fn update_data( - &self, - lock: &LockItem, - ) -> Result, LockedObjectStoreError>; + async fn update_data(&self, lock: &LockItem) -> Result, LockedObjectStoreError> + where + T: 'async_trait; /// Releases the given lock if the current user still has it, returning true if the lock was /// successfully released, and false if someone else already stole the lock - async fn release_lock( - &self, - lock: &LockItem, - ) -> Result; + async fn release_lock(&self, lock: &LockItem) -> Result + where + T: 'async_trait; } pub const DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS: u32 = 100; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn lock_item_detault() { + let _item = LockItem::<&str>::default(); + } +} diff --git a/src/redis.rs b/src/redis.rs new file mode 100644 index 0000000..ac1a8b5 --- /dev/null +++ b/src/redis.rs @@ -0,0 +1,87 @@ +//! +//! The redis module uses redis hash functions to implement a locking mechanism + +use crate::errors::LockedObjectStoreError; +use crate::{LockClient, LockItem}; +use serde::Serialize; + +/// +/// A Redis-based lock, which relies on the key functionality in redis +/// +/// ```rust +/// # use locking_object_store::redis::*; +/// let manager = rslock::LockManager::new(vec!["redis://127.0.0.1"]); +/// let lock = RedisLock::with(manager); +/// ```` +#[derive(Clone, Debug)] +pub struct RedisLock { + manager: rslock::LockManager, +} + +impl RedisLock { + pub fn with(manager: rslock::LockManager) -> Self { + Self { manager } + } +} + +#[async_trait::async_trait] +impl LockClient for RedisLock { + async fn try_acquire_lock(&self, data: T) -> Result>, LockedObjectStoreError> + where + T: 'async_trait, + { + let item = LockItem::from(data); + Ok(Some(item)) + } + + /// Returns current lock for data (if any). + // the original implementation of this was returning the top lock in the system, since our lock + // is unique based on data, we require that same piece of data in order to return the current lock + // again though, this is just lock information - it does not mean you're safe to act on the lock + async fn get_lock(&self, data: T) -> Result>, LockedObjectStoreError> + where + T: 'async_trait, + { + todo!() + } + + /// Update data in the upstream lock of the current user still has it. + /// The returned lock will have a new `rvn` so it'll increase the lease duration + /// as this method is usually called when the work with a lock is extended. + async fn update_data(&self, lock: &LockItem) -> Result, LockedObjectStoreError> + where + T: 'async_trait, + { + todo!() + } + + /// Releases the given lock if the current user still has it, returning true if the lock was + /// successfully released, and false if someone else already stole the lock + async fn release_lock(&self, lock: &LockItem) -> Result + where + T: 'async_trait, + { + todo!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "integration-test")] + mod integration_tests { + use super::*; + + #[tokio::test] + async fn test_construct() -> Result<(), crate::errors::LockedObjectStoreError> { + let manager = rslock::LockManager::new(vec!["redis://127.0.0.1/"]); + let lock = RedisLock::with(manager); + let data = "hello rust"; + let result = lock.try_acquire_lock(data).await?; + assert!(result.is_some(), "Should receive a valid lock item"); + + Ok(()) + } + } +}