Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ jobs:
- 'raft-kv-memstore-network-v2'
- 'raft-kv-memstore-opendal-snapshot-data'
- 'raft-kv-memstore-single-threaded'
- 'raft-kv-fjall'
- 'raft-kv-rocksdb'
- 'multi-raft-kv'

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ exclude = [
"examples/raft-kv-memstore-opendal-snapshot-data",
"examples/raft-kv-rocksdb",
"examples/multi-raft-kv",
"examples/raft-kv-fjall",

"rt-monoio",
"rt-compio",
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ test-examples:
cargo test --manifest-path examples/raft-kv-memstore-network-v2/Cargo.toml
cargo test --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml
cargo test --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml
cargo test --manifest-path examples/raft-kv-fjall/Cargo.toml
cargo test --manifest-path examples/raft-kv-rocksdb/Cargo.toml
cargo test --manifest-path examples/rocksstore/Cargo.toml
cargo test --manifest-path examples/multi-raft-kv/Cargo.toml
Expand Down Expand Up @@ -103,6 +104,7 @@ lint:
cargo fmt --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-memstore/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-fjall/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-rocksdb/Cargo.toml
cargo fmt --manifest-path examples/multi-raft-kv/Cargo.toml
cargo clippy --no-deps --all-targets -- -D warnings
Expand All @@ -118,6 +120,7 @@ lint:
cargo clippy --no-deps --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-memstore/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-fjall/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-rocksdb/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/multi-raft-kv/Cargo.toml --all-targets -- -D warnings
# Bug: clippy --all-targets reports false warning about unused dep in
Expand Down Expand Up @@ -166,6 +169,7 @@ check:
RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml
RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml
RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-memstore/Cargo.toml
RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-fjall/Cargo.toml
RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-rocksdb/Cargo.toml
RUSTFLAGS="-D warnings" cargo check --manifest-path examples/rocksstore/Cargo.toml
RUSTFLAGS="-D warnings" cargo check --manifest-path examples/multi-raft-kv/Cargo.toml
Expand All @@ -188,6 +192,7 @@ clean:
cargo clean --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml
cargo clean --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml
cargo clean --manifest-path examples/raft-kv-memstore/Cargo.toml
cargo clean --manifest-path examples/raft-kv-fjall/Cargo.toml
cargo clean --manifest-path examples/raft-kv-rocksdb/Cargo.toml
cargo clean --manifest-path examples/rocksstore/Cargo.toml
cargo clean --manifest-path examples/multi-raft-kv/Cargo.toml
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This directory contains example applications demonstrating different implementat
| Example | Log | State Machine | RaftNetwork Impl | RaftNetwork | Client | Server | Special Features |
|---------|-----|---------------|------------------|-------------|--------|--------|------------------|
| [raft-kv-memstore] | [log-mem] | [sm-mem] | HTTP/reqwest | RaftNetwork | reqwest | actix-web | Basic example |
| [raft-kv-fjall] | [fjall] | [fjall] | HTTP/reqwest([network-v1]) | RaftNetwork | reqwest | actix-web | Persistent storage |
| [raft-kv-rocksdb] | [rocksstore] | [rocksstore] | HTTP/reqwest([network-v1]) | RaftNetwork | reqwest | actix-web | Persistent storage |
| [raft-kv-memstore-network-v2] | [log-mem] | [sm-mem] | HTTP/reqwest | RaftNetworkV2 | reqwest | actix-web | Network V2 interface |
| [multi-raft-kv] | [log-mem] | [sm-mem] | HTTP/channel | GroupRouter | channel | in-memory | Multi-Raft groups |
Expand Down Expand Up @@ -48,6 +49,7 @@ The following symbolic links are provided for backward compatibility:

<!-- Reference Links -->
[raft-kv-memstore]: raft-kv-memstore/
[raft-kv-fjall]: raft-kv-fjall/
[raft-kv-rocksdb]: raft-kv-rocksdb/
[raft-kv-memstore-network-v2]: raft-kv-memstore-network-v2/
[raft-kv-memstore-grpc]: raft-kv-memstore-grpc/
Expand Down
46 changes: 46 additions & 0 deletions examples/raft-kv-fjall/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[package]
name = "raft-kv-fjall"
version = "0.1.0"
readme = "README.md"

edition = "2024"
authors = [
"ariesdevil <ariesdevil77@gmail.com>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/databendlabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[[bin]]
name = "raft-key-value-fjall"
path = "src/bin/main.rs"

[dependencies]
client-http = { path = "../client-http" }
network-v1-http = { path = "../network-v1-http" }
openraft = { path = "../../openraft", features = ["serde", "type-alias"] }
openraft-legacy = { path = "../../legacy" }
types-kv = { path = "../types-kv" }

actix-web = { version = "4.0.0-rc.2" }
clap = { version = "4.1.11", features = ["derive", "env"] }
fjall = {version = "3.0.1"}
futures = { version = "0.3" }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = { version = "1.0.57" }
tracing = { version = "0.1.40" }
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
byteorder = "1.5.0"

[dev-dependencies]
maplit = { version = "1.0.2" }
tempfile = { version = "3.4.0" }


[features]

[package.metadata.docs.rs]
all-features = true
47 changes: 47 additions & 0 deletions examples/raft-kv-fjall/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# openraft-fjall-kv-example

A fjall-backed persistent storage implementation for Openraft, demonstrating production-ready log storage and state machine patterns.

## Key Features Demonstrated

- **Persistent storage**: [`RaftLogStorage`] and [`RaftStateMachine`] with fjall
- **Column families**: Separate storage for logs, state machine, and metadata
- **Durability**: On-disk persistence for cluster recovery
- **Performance**: Efficient batch operations and compaction

## Overview

This example implements:
- **[`RaftLogStorage`](https://docs.rs/openraft/latest/openraft/storage/trait.RaftLogStorage.html)** - Persistent Raft log storage
- **[`RaftStateMachine`](https://docs.rs/openraft/latest/openraft/storage/trait.RaftStateMachine.html)** - Persistent application state machine

Built with [fjall v3](https://fjall-rs.github.io/) for production-grade durability and performance.

## Architecture

**Storage structure**:
- Logs stored in dedicated fjall key space
- State machine data in separate key space
- Vote and metadata persisted independently

**Asynchronous I/O operations**:
- WAL flush operations run in spawned tasks to avoid blocking the async runtime
- `save_vote()` and `append_to_log()` spawn async tasks for disk persistence
- Callbacks receive actual flush results for proper error propagation
- Log truncation (`purge()`) doesn't require immediate persistence

**Key Code Locations**:
- Storage implementation: `src/store.rs`
- Log storage with async WAL flush: `src/log_store.rs`
- Type definitions: See parent example for network and client implementations

## Comparison

| Feature | kv-fjall | memstore |
|---------|--------------|----------|
| Storage | fjall (disk) | Memory |
| Persistence | Yes | No |
| Recovery | Full | None |
| Complexity | Higher | Lower |

Built for testing and demonstration purposes.
18 changes: 18 additions & 0 deletions examples/raft-kv-fjall/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use futures::lock::Mutex;
use openraft::Config;

use crate::NodeId;
use crate::Raft;

// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
pub struct App {
pub id: NodeId,
pub addr: String,
pub raft: Raft,
pub key_values: Arc<Mutex<BTreeMap<String, String>>>,
pub config: Arc<Config>,
}
30 changes: 30 additions & 0 deletions examples/raft-kv-fjall/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use clap::Parser;
use raft_kv_fjall::start_example_raft_node;
use tracing_subscriber::EnvFilter;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
pub addr: String,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Setup the logger
tracing_subscriber::fmt()
.with_target(true)
.with_thread_ids(true)
.with_level(true)
.with_ansi(false)
.with_env_filter(EnvFilter::from_default_env())
.init();

// Parse the parameters passed by arguments.
let options = Opt::parse();

start_example_raft_node(options.id, format!("{}.db", options.addr), options.addr).await
}
92 changes: 92 additions & 0 deletions examples/raft-kv-fjall/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::path::Path;
use std::sync::Arc;

use actix_web::HttpServer;
use actix_web::middleware;
use actix_web::middleware::Logger;
use actix_web::web::Data;
use openraft::Config;

use crate::app::App;
use crate::network::api;
use crate::network::management;
use crate::network::raft;
use crate::store::new_storage;

pub mod app;
pub mod log_store;
pub mod network;
pub mod store;

pub type NodeId = u64;

openraft::declare_raft_types!(
pub TypeConfig:
D = types_kv::Request,
R = types_kv::Response,
);

pub type LogStore = log_store::FjallLogStore<TypeConfig>;
pub type StateMachineStore = store::StateMachineStore;
pub type Raft = openraft::Raft<TypeConfig>;

#[path = "../../utils/declare_types.rs"]
pub mod typ;

pub async fn start_example_raft_node<P>(node_id: NodeId, dir: P, addr: String) -> std::io::Result<()>
where P: AsRef<Path> {
// Create a configuration for the raft instance.
let config = Config {
heartbeat_interval: 250,
election_timeout_min: 299,
..Default::default()
};

let config = Arc::new(config.validate().unwrap());

let (log_store, state_machine_store) = new_storage(&dir).await;

let kvs = state_machine_store.data.kvs.clone();

// Create the network layer using network-v1 crate
let network = network_v1_http::NetworkFactory {};

// Create a local raft instance.
let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await.unwrap();

// Create an application that will store all the instances created above, this will
// later be used on the actix-web services.
let app_data = Data::new(App {
id: node_id,
addr: addr.clone(),
raft,
key_values: kvs,
config,
});

// Start the actix-web server.
let server = HttpServer::new(move || {
actix_web::App::new()
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.wrap(middleware::Compress::default())
.app_data(app_data.clone())
// raft internal RPC
.service(raft::append)
.service(raft::snapshot)
.service(raft::vote)
// admin API
.service(management::init)
.service(management::add_learner)
.service(management::change_membership)
.service(management::metrics)
// application API
.service(api::write)
.service(api::read)
.service(api::linearizable_read)
});

let x = server.bind(addr)?;

x.run().await
}
Loading