diff --git a/Cargo.lock b/Cargo.lock index 5fabd76e4d..1b11766936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3328,6 +3328,7 @@ dependencies = [ name = "libsql-wal" version = "0.1.0" dependencies = [ + "aes", "anyhow", "arc-swap", "async-lock 3.4.0", @@ -3339,6 +3340,7 @@ dependencies = [ "aws-smithy-types-convert", "bitflags 2.6.0", "bytes", + "cbc", "chrono", "clap 4.5.9", "crc32fast", @@ -3349,6 +3351,7 @@ dependencies = [ "fst", "hashbrown 0.14.5", "hex", + "hmac", "http-body 1.0.0", "hyper 0.14.30", "inquire", @@ -3366,6 +3369,7 @@ dependencies = [ "s3s 0.10.1-dev", "s3s-aws", "s3s-fs 0.10.1-dev", + "sha2", "tempfile", "thiserror", "tokio", diff --git a/libsql-server/src/namespace/configurator/libsql_primary.rs b/libsql-server/src/namespace/configurator/libsql_primary.rs index 3d5a054698..c6373590e2 100644 --- a/libsql-server/src/namespace/configurator/libsql_primary.rs +++ b/libsql-server/src/namespace/configurator/libsql_primary.rs @@ -94,7 +94,7 @@ pub(super) async fn libsql_primary_common( encryption_config: base_config.encryption_config.clone(), block_writes: block_writes.clone(), resolve_attach_path, - wal_manager: LibsqlWalManager::new(registry.clone(), namespace_resolver.clone()), + wal_manager: LibsqlWalManager::new(registry.clone(), namespace_resolver.clone(), Some("megasecret".to_string().into())), }), } .throttled( diff --git a/libsql-server/src/namespace/configurator/libsql_replica.rs b/libsql-server/src/namespace/configurator/libsql_replica.rs index 8456482b4b..c01a0f934e 100644 --- a/libsql-server/src/namespace/configurator/libsql_replica.rs +++ b/libsql-server/src/namespace/configurator/libsql_replica.rs @@ -114,6 +114,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator { wal_manager: LibsqlWalManager::new( self.registry.clone(), self.namespace_resolver.clone(), + Some("megasecret".to_string().into()) ), }), }; diff --git a/libsql-wal/Cargo.toml b/libsql-wal/Cargo.toml index ca32f34f32..36ee81e49a 100644 --- a/libsql-wal/Cargo.toml +++ b/libsql-wal/Cargo.toml @@ -48,6 +48,12 @@ aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] } petgraph = "0.6.5" anyhow = { version = "1.0.86", optional = true } +# feat: encryption +aes = { version = "0.8.4" } +cbc = { version = "0.1.2" } +hmac = { version = "0.12.1" } +sha2 = { version = "0.10.8" } + [dev-dependencies] criterion = "0.5.1" hex = "0.4.3" diff --git a/libsql-wal/src/encryption.rs b/libsql-wal/src/encryption.rs new file mode 100644 index 0000000000..56e6bdbea1 --- /dev/null +++ b/libsql-wal/src/encryption.rs @@ -0,0 +1,10 @@ +use aes::Aes256; + +use crate::LIBSQL_PAGE_SIZE; + +#[derive(Debug)] +pub struct EncryptionConfig { + pub decryptor: cbc::Decryptor, + pub encryptor: cbc::Encryptor, + pub scratch: Box<[u8; LIBSQL_PAGE_SIZE as usize]> +} diff --git a/libsql-wal/src/lib.rs b/libsql-wal/src/lib.rs index f6e98f183e..b571f7fa23 100644 --- a/libsql-wal/src/lib.rs +++ b/libsql-wal/src/lib.rs @@ -11,6 +11,7 @@ pub mod shared_wal; pub mod storage; pub mod transaction; pub mod wal; +mod encryption; const LIBSQL_MAGIC: u64 = u64::from_be_bytes(*b"LIBSQL\0\0"); const LIBSQL_PAGE_SIZE: u16 = 4096; @@ -124,7 +125,7 @@ pub mod test { tokio::spawn(checkpointer.run()); } - let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver)); + let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver), None); Self { tmp, registry, wal } } diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index b6a94b7232..b63a2b131c 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -9,6 +9,8 @@ use std::sync::{ Arc, }; +use aes::cipher::block_padding::NoPadding; +use aes::cipher::BlockEncryptMut; use chrono::{DateTime, Utc}; use crossbeam_skiplist::SkipMap; use fst::MapBuilder; @@ -24,6 +26,7 @@ use crate::io::file::FileExt; use crate::io::Inspect; use crate::segment::{checked_frame_offset, SegmentFlags}; use crate::segment::{frame_offset, page_offset, sealed::SealedSegment}; +use crate::shared_wal::Context; use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared}; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; @@ -228,12 +231,13 @@ impl CurrentSegment { } } - #[tracing::instrument(skip(self, pages, tx))] + #[tracing::instrument(skip_all)] pub fn insert_pages<'a>( &self, pages: impl Iterator, size_after: Option, tx: &mut TxGuardShared, + ctx: &mut Context, ) -> Result> where F: FileExt, @@ -245,6 +249,14 @@ impl CurrentSegment { // let mut commit_frame_written = false; let current_savepoint = tx.savepoints.last_mut().expect("no savepoints initialized"); while let Some((page_no, page)) = pages.next() { + let page = match ctx.encryption { + Some(ref mut crypto) if page_no != 1 => { + crypto.encryptor.clone().encrypt_padded_b2b_mut::(page, crypto.scratch.as_mut_slice()).unwrap(); + crypto.scratch.as_slice() + }, + _ => page, + }; + // optim: if the page is already present, overwrite its content if let Some(offset) = current_savepoint.index.get(&page_no) { tracing::trace!(page_no, "recycling frame"); diff --git a/libsql-wal/src/segment/list.rs b/libsql-wal/src/segment/list.rs index b50622d3ef..e5a41a44d2 100644 --- a/libsql-wal/src/segment/list.rs +++ b/libsql-wal/src/segment/list.rs @@ -407,7 +407,7 @@ mod test { use tempfile::{tempfile, NamedTempFile}; use tokio_stream::StreamExt as _; - use crate::test::{seal_current_segment, TestEnv}; + use crate::{shared_wal::Context, test::{seal_current_segment, TestEnv}}; use super::*; @@ -442,11 +442,12 @@ mod test { let mut file = NamedTempFile::new().unwrap(); let mut tx = shared.begin_read(999999).into(); + let ctx = Context::default(); while let Some(frame) = stream.next().await { let frame = frame.unwrap(); let mut buffer = [0; 4096]; shared - .read_page(&mut tx, frame.header.page_no(), &mut buffer) + .read_page(&mut tx, frame.header.page_no(), &mut buffer, &ctx) .unwrap(); assert_eq!(buffer, frame.data()); file.write_all(frame.data()).unwrap(); diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index c1bc1749f4..568f808e0a 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; +use aes::cipher::block_padding::NoPadding; +use aes::cipher::BlockDecryptMut; use arc_swap::ArcSwap; use crossbeam::deque::Injector; use crossbeam::sync::Unparker; @@ -11,6 +13,7 @@ use tokio::sync::{mpsc, watch}; use uuid::Uuid; use crate::checkpointer::CheckpointMessage; +use crate::encryption::EncryptionConfig; use crate::error::{Error, Result}; use crate::io::file::FileExt; use crate::io::Io; @@ -20,6 +23,12 @@ use crate::segment_swap_strategy::SegmentSwapStrategy; use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction}; use libsql_sys::name::NamespaceName; +/// addtional context passed during wal operation +#[derive(Debug, Default)] +pub struct Context { + pub encryption: Option, +} + #[derive(Default)] pub struct WalLock { pub(crate) tx_id: Arc>>, @@ -220,12 +229,13 @@ impl SharedWal { }) } - #[tracing::instrument(skip(self, tx, buffer))] + #[tracing::instrument(skip_all)] pub fn read_page( &self, tx: &mut Transaction, page_no: u32, buffer: &mut [u8], + ctx: &Context, ) -> Result<()> { match tx.current.find_frame(page_no, tx) { Some(offset) => { @@ -262,6 +272,12 @@ impl SharedWal { } } + if let Some(ref enc) = ctx.encryption { + if page_no != 1 { + enc.decryptor.clone().decrypt_padded_mut::(buffer).unwrap(); + } + } + tx.pages_read += 1; Ok(()) @@ -273,10 +289,11 @@ impl SharedWal { tx: &mut WriteTransaction, pages: impl Iterator, size_after: Option, + ctx: &mut Context, ) -> Result<()> { let current = self.current.load(); let mut tx = tx.lock(); - if let Some(last_committed) = current.insert_pages(pages, size_after, &mut tx)? { + if let Some(last_committed) = current.insert_pages(pages, size_after, &mut tx, ctx)? { self.new_frame_notifier.send_replace(last_committed); } diff --git a/libsql-wal/src/wal.rs b/libsql-wal/src/wal.rs index 12dd75c7b0..b92e001dfd 100644 --- a/libsql-wal/src/wal.rs +++ b/libsql-wal/src/wal.rs @@ -3,20 +3,27 @@ use std::os::unix::prelude::OsStrExt; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use aes::cipher::KeyIvInit as _; +use aes::Aes256; +use hmac::{Hmac, Mac as _}; use libsql_sys::name::NamespaceResolver; use libsql_sys::wal::{Wal, WalManager}; +use sha2::Sha256; +use crate::encryption::EncryptionConfig; use crate::io::Io; use crate::registry::WalRegistry; use crate::segment::sealed::SealedSegment; -use crate::shared_wal::SharedWal; +use crate::shared_wal::{Context, SharedWal}; use crate::storage::Storage; use crate::transaction::Transaction; +use crate::LIBSQL_PAGE_SIZE; pub struct LibsqlWalManager { registry: Arc>, next_conn_id: Arc, namespace_resolver: Arc, + secret: Option>, } impl Clone for LibsqlWalManager { @@ -25,6 +32,7 @@ impl Clone for LibsqlWalManager { registry: self.registry.clone(), next_conn_id: self.next_conn_id.clone(), namespace_resolver: self.namespace_resolver.clone(), + secret: self.secret.clone(), } } } @@ -33,11 +41,13 @@ impl LibsqlWalManager { pub fn new( registry: Arc>, namespace_resolver: Arc, + secret: Option>, ) -> Self { Self { registry, next_conn_id: Default::default(), namespace_resolver, + secret, } } } @@ -47,6 +57,7 @@ pub struct LibsqlWal { tx: Option>, shared: Arc>, conn_id: u64, + context: Context, } impl>> WalManager for LibsqlWalManager { @@ -74,11 +85,32 @@ impl>> WalManager for Libsq let conn_id = self .next_conn_id .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let context = match self.secret { + Some(ref secret) => { + let mut mac = Hmac::::new_from_slice(b"secret").unwrap(); + mac.update(secret.as_bytes()); + let key_h = mac.finalize().into_bytes(); + let iv = [42u8; 16]; + let encryptor = cbc::Encryptor::::new(&key_h.into(), &iv.into()); + let decryptor = cbc::Decryptor::::new(&key_h.into(), &iv.into()); + Context { + encryption: Some(EncryptionConfig { + decryptor, + encryptor, + scratch: Box::new([0; LIBSQL_PAGE_SIZE as usize]), + }) + } + } + _ => Default::default(), + }; + Ok(LibsqlWal { last_read_frame_no: None, tx: None, shared, conn_id, + context, }) } @@ -161,7 +193,7 @@ impl Wal for LibsqlWal { tracing::trace!(page_no, "reading frame"); let tx = self.tx.as_mut().unwrap(); self.shared - .read_page(tx, page_no.get(), buffer) + .read_page(tx, page_no.get(), buffer, &mut self.context) .map_err(Into::into)?; Ok(()) } @@ -279,6 +311,7 @@ impl Wal for LibsqlWal { tx, page_headers.iter(), (size_after != 0).then_some(size_after), + &mut self.context, ) .map_err(Into::into)?; }