Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ restate-metadata-store = { path = "crates/metadata-store" }
restate-node = { path = "crates/node" }
restate-object-store-util = { path = "crates/object-store-util" }
restate-partition-store = { path = "crates/partition-store" }
restate-platform = { path = "crates/platform" }
restate-queue = { path = "crates/queue" }
restate-rocksdb = { path = "crates/rocksdb" }
restate-serde-util = { path = "crates/serde-util" }
Expand Down
3 changes: 2 additions & 1 deletion crates/clock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ test-util = []

[dependencies]
restate-workspace-hack = { workspace = true }
restate-encoding = { workspace = true }

restate-platform = { workspace = true }

bilrost = { workspace = true }
jiff = { workspace = true, optional = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/clock/src/rough_ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::num::NonZeroU32;
use std::ops::{Add, Sub};
use std::time::Duration;

use restate_platform::network::NetSerde;

use crate::WallClock;
use crate::time::MillisSinceEpoch;
use crate::unique_timestamp::UniqueTimestamp;
Expand Down Expand Up @@ -45,7 +47,7 @@ impl fmt::Debug for RoughTimestamp {
}
}

impl restate_encoding::NetSerde for RoughTimestamp {}
impl NetSerde for RoughTimestamp {}

const _: () = {
assert!(
Expand Down
6 changes: 4 additions & 2 deletions crates/clock/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::time::{Duration, SystemTime};
// Note: BilrostNewType and NetSerde derives are not used since both MillisSinceEpoch
// and NanosSinceEpoch have custom implementations for niche optimization.

use restate_platform::network::NetSerde;

use crate::WallClock;

/// Milliseconds since the unix epoch.
Expand Down Expand Up @@ -46,7 +48,7 @@ impl fmt::Debug for MillisSinceEpoch {
}
}

impl restate_encoding::NetSerde for MillisSinceEpoch {}
impl NetSerde for MillisSinceEpoch {}

// Static assertions to ensure that MillisSinceEpoch is the same size as u64
// and that niche optimization works.
Expand Down Expand Up @@ -376,7 +378,7 @@ impl fmt::Debug for NanosSinceEpoch {
}
}

impl restate_encoding::NetSerde for NanosSinceEpoch {}
impl NetSerde for NanosSinceEpoch {}

// Static assertions to ensure that NanosSinceEpoch is the same size as u64
// and that niche optimization works.
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ restate-core-derive = { workspace = true, optional = true }
restate-futures-util = { workspace = true }
restate-memory = { workspace = true }
restate-metadata-store = { workspace = true }
restate-platform = { workspace = true }
restate-time-util = { workspace = true }
restate-types = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/network/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::marker::PhantomData;
use bytes::Bytes;
use tokio::sync::{oneshot, watch};

use restate_memory::EstimatedMemorySize;
use restate_platform::memory::EstimatedMemorySize;

pub use restate_memory::MemoryLease;
use restate_types::GenerationalNodeId;
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/network/message_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use tokio::sync::{mpsc, oneshot, watch};
use tokio_stream::StreamExt;
use tracing::{debug, instrument, trace, warn};

use restate_memory::{EstimatedMemorySize, MemoryLease, MemoryPool};
use restate_memory::{MemoryLease, MemoryPool};
use restate_platform::memory::EstimatedMemorySize;
use restate_types::SharedString;
use restate_types::net::{Service, ServiceTag};

Expand Down
5 changes: 1 addition & 4 deletions crates/encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ publish = false
restate-workspace-hack = { workspace = true }

restate-encoding-derive = { version = "0.1.0", path = "derive" }
restate-sharding = { workspace = true }
restate-platform = { workspace = true }

bilrost = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }

[dev-dependencies]
rand = { workspace = true }
restate-sharding = { workspace = true, features = ["bilrost"] }
static_assertions = { workspace = true }
4 changes: 2 additions & 2 deletions crates/encoding/derive/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ pub fn net_serde_inner(input: DeriveInput) -> Result<TokenStream, syn::Error> {

let where_clauses = field_types.iter().map(|ty| {
quote! {
#ty: NetSerde
#ty: ::restate_platform::network::NetSerde
}
});

let expanded = quote! {
impl ::restate_encoding::NetSerde for #name where #(#where_clauses),* {}
impl ::restate_platform::network::NetSerde for #name where #(#where_clauses),* {}
};

Ok(TokenStream::from(expanded))
Expand Down
3 changes: 2 additions & 1 deletion crates/encoding/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use bilrost::{
encoding::{EmptyState, ForOverwrite, Proxiable},
};
use restate_encoding_derive::BilrostNewType;
use restate_platform::network::NetSerde;

use crate::{NetSerde, bilrost_encodings::RestateEncoding};
use crate::bilrost_encodings::RestateEncoding;

struct U128Tag;

Expand Down
132 changes: 0 additions & 132 deletions crates/encoding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,97 +12,10 @@ mod bilrost_as;
pub mod bilrost_encodings;
mod common;

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::RangeInclusive;
use std::sync::Arc;

pub use bilrost_as::BilrostAsAdaptor;
pub use bilrost_encodings::{Arced, ArcedSlice, RestateEncoding};
pub use common::U128;
pub use restate_encoding_derive::{BilrostAs, BilrostNewType, NetSerde};
/// A marker trait for types that can be serialized and sent over the network.
///
/// Types implementing this trait are considered eligible for wire transmission,
/// typically via serialization. It is intended to be implemented automatically
/// using the `#[derive(NetSerde)]` macro.
///
/// # Example
/// ```ignore
/// #[derive(NetSerde)]
/// struct MyMessage {
/// a: u64,
/// b: String,
/// }
/// ```
pub trait NetSerde {}

macro_rules! impl_net_serde {
($t:ty) => {
impl NetSerde for $t {}
};
($($t:ty),+) => {
$(impl_net_serde!($t);)+
}
}

impl_net_serde!(
bool,
usize,
u8,
u16,
u32,
u64,
u128,
isize,
i8,
i16,
i32,
i64,
i128,
String,
bytes::Bytes,
bytestring::ByteString,
std::time::Duration
);

macro_rules! impl_net_serde_tuple {
($($t:ident),+) => {
impl<$($t),+> NetSerde for ($($t),+) where $($t: NetSerde),+ {}
};
}

impl_net_serde_tuple!(T0, T1);
impl_net_serde_tuple!(T0, T1, T2);
impl_net_serde_tuple!(T0, T1, T2, T3);
impl_net_serde_tuple!(T0, T1, T2, T3, T4);
impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5);
impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5, T6);

impl<T> NetSerde for Vec<T> where T: NetSerde {}
impl<T> NetSerde for Option<T> where T: NetSerde {}
impl<K, V, S> NetSerde for HashMap<K, V, S>
where
K: NetSerde,
V: NetSerde,
{
}

impl<K, V> NetSerde for BTreeMap<K, V>
where
K: NetSerde,
V: NetSerde,
{
}

impl<V> NetSerde for HashSet<V> where V: NetSerde {}
impl<Idx> NetSerde for RangeInclusive<Idx> where Idx: NetSerde {}
impl NetSerde for restate_sharding::PartitionId {}
impl<T> NetSerde for Arc<T> where T: NetSerde {}
impl<T> NetSerde for Arc<[T]> where T: NetSerde {}
impl<T> NetSerde for Box<T> where T: NetSerde {}
impl<T, const N: usize> NetSerde for [T; N] where T: NetSerde {}

#[cfg(test)]
mod test {
Expand Down Expand Up @@ -132,49 +45,4 @@ mod test {

assert_eq!(x.id.0, y.id);
}

/// Validates that `KeyRange`'s general bilrost encoding produces the same
/// wire format as `RangeInclusive<u64>` with `RestateEncoding`. This ensures
/// that `KeyRange` fields can use `#[bilrost(N)]` and remain wire-compatible
/// with the old `#[bilrost(tag(N), encoding(RestateEncoding))] RangeInclusive<u64>`.
#[test]
fn key_range_wire_compat_with_range_inclusive() {
use restate_sharding::KeyRange;

use super::RestateEncoding;

#[derive(Debug, PartialEq, bilrost::Message)]
struct WithKeyRange {
#[bilrost(1)]
range: KeyRange,
}

#[derive(Debug, PartialEq, bilrost::Message)]
struct WithRangeInclusive {
#[bilrost(tag(1), encoding(RestateEncoding))]
range: std::ops::RangeInclusive<u64>,
}

for (start, end) in [(0u64, 0u64), (1, 100), (0, u64::MAX), (42, 42)] {
let kr = KeyRange::new(start, end);
let ri = start..=end;

let kr_bytes = WithKeyRange { range: kr }.encode_to_vec();
let ri_bytes = WithRangeInclusive { range: ri.clone() }.encode_to_vec();

assert_eq!(
kr_bytes, ri_bytes,
"wire format mismatch for range ({start}, {end})"
);

// Cross-decode
let decoded: WithRangeInclusive =
WithRangeInclusive::decode(&*kr_bytes).expect("cross-decode KeyRange→RI");
assert_eq!(decoded.range, ri);

let decoded: WithKeyRange =
WithKeyRange::decode(&*ri_bytes).expect("cross-decode RI→KeyRange");
assert_eq!(decoded.range, kr);
}
}
Comment on lines -136 to -179
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this test removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) The test was added a couple of commits ago to prove wire compatibility, we don't really need to maintain it in the long run. (2) it was removed because restate-encoding doesn't really need to depend on restate-sharding.

}
10 changes: 6 additions & 4 deletions crates/encoding/tests/net_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::HashMap;

use restate_encoding::NetSerde;
use static_assertions::assert_impl_all;
use restate_platform::hash::HashMap;
use restate_platform::network::NetSerde;

#[allow(dead_code)]
#[derive(NetSerde)]
Expand All @@ -30,4 +29,7 @@ struct NotSendable;
#[derive(NetSerde)]
struct Inner(HashMap<u64, String>);

assert_impl_all!(SomeMessage: NetSerde);
const _: fn() = || {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we usually used const _: (). Any reason to use a function value here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just the macro expansion as is. No strong opinion, just wanted to remove the dependency.

fn assert_impl_all<T: ?Sized + NetSerde>() {}
assert_impl_all::<SomeMessage>();
};
1 change: 1 addition & 0 deletions crates/log-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ restate-core = { workspace = true }
restate-futures-util = { workspace = true }
restate-memory = { workspace = true }
restate-metadata-store = { workspace = true }
restate-platform = { workspace = true }
restate-rocksdb = { workspace = true }
restate-serde-util = { workspace = true }
restate-time-util = { workspace = true }
Expand Down
Loading
Loading