diff --git a/Cargo.toml b/Cargo.toml index 4c31396..4137f62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,13 +22,16 @@ rand = "0.8.5" snap = "1.0" errno = { optional = true, version = "0.2" } -fs2 = {optional = true, version = "0.4.3"} +fs2 = { optional = true, version = "0.4.3" } -tokio = { optional = true, features = ["rt", "sync"], version = ">= 1.21" } +tokio = { optional = true, features = ["rt", "sync"], version = "1.39.3" } +async-std = { optional = true, version = "1.12.0" } [features] default = ["fs"] -async = ["tokio"] +async = ["asyncdb-tokio"] +asyncdb-tokio = ["tokio"] +asyncdb-async-std = ["async-std"] fs = ["errno", "fs2"] [dev-dependencies] @@ -46,7 +49,8 @@ members = [ "examples/leveldb-tool", "examples/word-analyze", "examples/stresstest", - "examples/asyncdb", + "examples/asyncdb-tokio", + "examples/asyncdb-async-std", "examples/mcpe", "examples/kvserver", ] diff --git a/examples/asyncdb-async-std/Cargo.toml b/examples/asyncdb-async-std/Cargo.toml new file mode 100644 index 0000000..a48e0fe --- /dev/null +++ b/examples/asyncdb-async-std/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "asyncdb-async-std" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = { version = "1.12.0", features = ["attributes"] } +rusty-leveldb = { path = "../../", features = ["asyncdb-async-std"] } diff --git a/examples/asyncdb-async-std/src/main.rs b/examples/asyncdb-async-std/src/main.rs new file mode 100644 index 0000000..8cf3b2d --- /dev/null +++ b/examples/asyncdb-async-std/src/main.rs @@ -0,0 +1,38 @@ +use rusty_leveldb::{AsyncDB, Options, Status, StatusCode}; + +#[async_std::main] +async fn main() { + let adb = AsyncDB::new("testdb", Options::default()).unwrap(); + + adb.put("Hello".as_bytes().to_owned(), "World".as_bytes().to_owned()) + .await + .expect("put()"); + + let r = adb.get("Hello".as_bytes().to_owned()).await; + assert_eq!(r, Ok(Some("World".as_bytes().to_owned()))); + + let snapshot = adb.get_snapshot().await.expect("get_snapshot()"); + + adb.delete("Hello".as_bytes().to_owned()) + .await + .expect("delete()"); + + // A snapshot allows us to travel back in time before the deletion. + let r2 = adb.get_at(snapshot, "Hello".as_bytes().to_owned()).await; + assert_eq!(r2, Ok(Some("World".as_bytes().to_owned()))); + + // Once dropped, a snapshot cannot be used anymore. + adb.drop_snapshot(snapshot).await.expect("drop_snapshot()"); + + let r3 = adb.get_at(snapshot, "Hello".as_bytes().to_owned()).await; + assert_eq!( + r3, + Err(Status { + code: StatusCode::AsyncError, + err: "Unknown snapshot reference: this is a bug".to_string() + }) + ); + + adb.flush().await.expect("flush()"); + adb.close().await.expect("close()"); +} diff --git a/examples/asyncdb/Cargo.toml b/examples/asyncdb-tokio/Cargo.toml similarity index 51% rename from examples/asyncdb/Cargo.toml rename to examples/asyncdb-tokio/Cargo.toml index 89bee62..b04eb67 100644 --- a/examples/asyncdb/Cargo.toml +++ b/examples/asyncdb-tokio/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "asyncdb" +name = "asyncdb-tokio" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.21", features = ["rt", "macros" ] } -rusty-leveldb = { path = "../../", features = ["async"] } +tokio = { version = "1.21", features = ["rt", "macros"] } +rusty-leveldb = { path = "../../", features = ["asyncdb-tokio"] } diff --git a/examples/asyncdb/src/main.rs b/examples/asyncdb-tokio/src/main.rs similarity index 100% rename from examples/asyncdb/src/main.rs rename to examples/asyncdb-tokio/src/main.rs diff --git a/src/asyncdb.rs b/src/asyncdb.rs index ad02a85..1f3a32c 100644 --- a/src/asyncdb.rs +++ b/src/asyncdb.rs @@ -1,20 +1,17 @@ use std::collections::hash_map::HashMap; -use std::path::Path; -use std::sync::Arc; -use crate::{Options, Result, Status, StatusCode, WriteBatch, DB}; +use crate::{ + send_response, send_response_result, AsyncDB, Message, Result, Status, StatusCode, WriteBatch, + DB, +}; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tokio::task::{spawn_blocking, JoinHandle}; - -const CHANNEL_BUFFER_SIZE: usize = 32; +pub(crate) const CHANNEL_BUFFER_SIZE: usize = 32; #[derive(Clone, Copy)] pub struct SnapshotRef(usize); /// A request sent to the database thread. -enum Request { +pub(crate) enum Request { Close, Put { key: Vec, val: Vec }, Delete { key: Vec }, @@ -28,42 +25,14 @@ enum Request { } /// A response received from the database thread. -enum Response { +pub(crate) enum Response { OK, Error(Status), Value(Option>), Snapshot(SnapshotRef), } -/// Contains both a request and a back-channel for the reply. -struct Message { - req: Request, - resp_channel: oneshot::Sender, -} - -/// `AsyncDB` makes it easy to use LevelDB in a tokio runtime. -/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. -/// -/// TODO: Make it work in other runtimes as well. This is a matter of adapting the blocking thread -/// mechanism as well as the channel types. -#[derive(Clone)] -pub struct AsyncDB { - jh: Arc>, - send: mpsc::Sender, -} - impl AsyncDB { - /// Create a new or open an existing database. - pub fn new>(name: P, opts: Options) -> Result { - let db = DB::open(name, opts)?; - let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE); - let jh = spawn_blocking(move || AsyncDB::run_server(db, recv)); - Ok(AsyncDB { - jh: Arc::new(jh), - send, - }) - } - pub async fn close(&self) -> Result<()> { let r = self.process_request(Request::Close).await?; match r { @@ -182,54 +151,32 @@ impl AsyncDB { } } - async fn process_request(&self, req: Request) -> Result { - let (tx, rx) = oneshot::channel(); - let m = Message { - req, - resp_channel: tx, - }; - if let Err(e) = self.send.send(m).await { - return Err(Status { - code: StatusCode::AsyncError, - err: e.to_string(), - }); - } - let resp = rx.await; - match resp { - Err(e) => Err(Status { - code: StatusCode::AsyncError, - err: e.to_string(), - }), - Ok(r) => Ok(r), - } - } - - fn run_server(mut db: DB, mut recv: mpsc::Receiver) { + pub(crate) fn run_server(mut db: DB, mut recv: impl ReceiverExt) { let mut snapshots = HashMap::new(); let mut snapshot_counter: usize = 0; while let Some(message) = recv.blocking_recv() { match message.req { Request::Close => { - message.resp_channel.send(Response::OK).ok(); + send_response(message.resp_channel, Response::OK); recv.close(); return; } Request::Put { key, val } => { let ok = db.put(&key, &val); - send_response(message.resp_channel, ok); + send_response_result(message.resp_channel, ok); } Request::Delete { key } => { let ok = db.delete(&key); - send_response(message.resp_channel, ok); + send_response_result(message.resp_channel, ok); } Request::Write { batch, sync } => { let ok = db.write(batch, sync); - send_response(message.resp_channel, ok); + send_response_result(message.resp_channel, ok); } Request::Flush => { let ok = db.flush(); - send_response(message.resp_channel, ok); + send_response_result(message.resp_channel, ok); } Request::GetAt { snapshot, key } => { let snapshot_id = snapshot.0; @@ -237,49 +184,46 @@ impl AsyncDB { let ok = db.get_at(snapshot, &key); match ok { Err(e) => { - message.resp_channel.send(Response::Error(e)).ok(); + send_response(message.resp_channel, Response::Error(e)); } Ok(v) => { - message.resp_channel.send(Response::Value(v)).ok(); + send_response(message.resp_channel, Response::Value(v)); } }; } else { - message - .resp_channel - .send(Response::Error(Status { + send_response( + message.resp_channel, + Response::Error(Status { code: StatusCode::AsyncError, err: "Unknown snapshot reference: this is a bug".to_string(), - })) - .ok(); + }), + ); } } Request::Get { key } => { let r = db.get(&key); - message.resp_channel.send(Response::Value(r)).ok(); + send_response(message.resp_channel, Response::Value(r)); } Request::GetSnapshot => { snapshots.insert(snapshot_counter, db.get_snapshot()); let sref = SnapshotRef(snapshot_counter); snapshot_counter += 1; - message.resp_channel.send(Response::Snapshot(sref)).ok(); + send_response(message.resp_channel, Response::Snapshot(sref)); } Request::DropSnapshot { snapshot } => { snapshots.remove(&snapshot.0); - send_response(message.resp_channel, Ok(())); + send_response_result(message.resp_channel, Ok(())); } Request::CompactRange { from, to } => { let ok = db.compact_range(&from, &to); - send_response(message.resp_channel, ok); + send_response_result(message.resp_channel, ok); } } } } } -fn send_response(ch: oneshot::Sender, result: Result<()>) { - if let Err(e) = result { - ch.send(Response::Error(e)).ok(); - } else { - ch.send(Response::OK).ok(); - } +pub(crate) trait ReceiverExt { + fn blocking_recv(&mut self) -> Option; + fn close(&mut self); } diff --git a/src/asyncdb_async_std.rs b/src/asyncdb_async_std.rs new file mode 100644 index 0000000..09e23f9 --- /dev/null +++ b/src/asyncdb_async_std.rs @@ -0,0 +1,79 @@ +use std::path::Path; +use std::sync::Arc; + +use async_std::channel; +use async_std::task::{spawn_blocking, JoinHandle}; + +use crate::asyncdb::{ReceiverExt, Request, Response, CHANNEL_BUFFER_SIZE}; +use crate::{Options, Result, Status, StatusCode, DB}; + +pub(crate) struct Message { + pub(crate) req: Request, + pub(crate) resp_channel: channel::Sender, +} +/// `AsyncDB` makes it easy to use LevelDB in a async-std runtime. +/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. +#[derive(Clone)] +pub struct AsyncDB { + jh: Arc>, + send: channel::Sender, +} + +impl AsyncDB { + /// Create a new or open an existing database. + pub fn new>(name: P, opts: Options) -> Result { + let db = DB::open(name, opts)?; + + let (send, recv) = channel::bounded(CHANNEL_BUFFER_SIZE); + let jh = spawn_blocking(move || AsyncDB::run_server(db, recv)); + Ok(AsyncDB { + jh: Arc::new(jh), + send, + }) + } + + pub(crate) async fn process_request(&self, req: Request) -> Result { + let (tx, rx) = channel::bounded(1); + + let m = Message { + req, + resp_channel: tx, + }; + if let Err(e) = self.send.send(m).await { + return Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }); + } + let resp = rx.recv().await; + match resp { + Err(e) => Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }), + Ok(r) => Ok(r), + } + } +} + +pub(crate) fn send_response_result(ch: channel::Sender, result: Result<()>) { + if let Err(e) = result { + ch.try_send(Response::Error(e)).ok(); + } else { + ch.try_send(Response::OK).ok(); + } +} + +pub(crate) fn send_response(ch: channel::Sender, res: Response) { + ch.send_blocking(res).ok(); +} + +impl ReceiverExt for channel::Receiver { + fn blocking_recv(&mut self) -> Option { + self.recv_blocking().ok() + } + + fn close(&mut self) { + channel::Receiver::close(self); + } +} diff --git a/src/asyncdb_tokio.rs b/src/asyncdb_tokio.rs new file mode 100644 index 0000000..2f2441d --- /dev/null +++ b/src/asyncdb_tokio.rs @@ -0,0 +1,83 @@ +use std::path::Path; +use std::sync::Arc; + +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::task::{spawn_blocking, JoinHandle}; + +use crate::asyncdb::ReceiverExt; +use crate::asyncdb::CHANNEL_BUFFER_SIZE; +use crate::asyncdb::{Request, Response}; +use crate::{Options, Result, Status, StatusCode, DB}; + +pub(crate) struct Message { + pub(crate) req: Request, + pub(crate) resp_channel: oneshot::Sender, +} + +/// `AsyncDB` makes it easy to use LevelDB in a tokio runtime. +/// The methods follow very closely the main API (see `DB` type). Iteration is not yet implemented. +#[derive(Clone)] +pub struct AsyncDB { + jh: Arc>, + send: mpsc::Sender, +} + +impl AsyncDB { + /// Create a new or open an existing database. + pub fn new>(name: P, opts: Options) -> Result { + let db = DB::open(name, opts)?; + let (send, recv) = mpsc::channel(CHANNEL_BUFFER_SIZE); + + let jh = spawn_blocking(move || AsyncDB::run_server(db, recv)); + Ok(AsyncDB { + jh: Arc::new(jh), + send, + }) + } + pub(crate) async fn process_request(&self, req: Request) -> Result { + let (tx, rx) = oneshot::channel(); + + let m = Message { + req, + resp_channel: tx, + }; + if let Err(e) = self.send.send(m).await { + return Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }); + } + let resp = rx.await; + + match resp { + Err(e) => Err(Status { + code: StatusCode::AsyncError, + err: e.to_string(), + }), + Ok(r) => Ok(r), + } + } +} + +pub(crate) fn send_response_result(ch: oneshot::Sender, result: Result<()>) { + if let Err(e) = result { + ch.send(Response::Error(e)).ok(); + } else { + ch.send(Response::OK).ok(); + } +} + +pub(crate) fn send_response(ch: oneshot::Sender, res: Response) { + ch.send(res).ok(); +} + +impl ReceiverExt for mpsc::Receiver { + fn blocking_recv(&mut self) -> Option { + self.blocking_recv() + } + + fn close(&mut self) { + mpsc::Receiver::close(self); + } +} diff --git a/src/lib.rs b/src/lib.rs index 77a8de1..33fd29e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,9 +40,19 @@ extern crate time_test; #[macro_use] mod infolog; -#[cfg(feature = "async")] +#[cfg(any(feature = "asyncdb-tokio", feature = "asyncdb-async-std"))] mod asyncdb; +#[cfg(feature = "asyncdb-tokio")] +mod asyncdb_tokio; +#[cfg(feature = "asyncdb-tokio")] +use asyncdb_tokio::{send_response, send_response_result, Message}; + +#[cfg(feature = "asyncdb-async-std")] +mod asyncdb_async_std; +#[cfg(feature = "asyncdb-async-std")] +use asyncdb_async_std::{send_response, send_response_result, Message}; + mod block; mod block_builder; mod blockhandle; @@ -82,9 +92,10 @@ mod db_iter; pub mod compressor; pub mod env; -#[cfg(feature = "async")] -pub use asyncdb::AsyncDB; - +#[cfg(feature = "asyncdb-async-std")] +pub use asyncdb_async_std::AsyncDB; +#[cfg(feature = "asyncdb-tokio")] +pub use asyncdb_tokio::AsyncDB; pub use cmp::{Cmp, DefaultCmp}; pub use compressor::{Compressor, CompressorId}; pub use db_impl::DB;