From 461372a084c67c9d7091ed2a060c402f494d679a Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Mon, 26 Jun 2023 16:15:25 -0700 Subject: [PATCH 01/14] Code for randomly inserting node into binary tree and measuring worst case node jump from left to right subtree --- .../examples/topolotree/random_insertions.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 hydro_cli_examples/examples/topolotree/random_insertions.py diff --git a/hydro_cli_examples/examples/topolotree/random_insertions.py b/hydro_cli_examples/examples/topolotree/random_insertions.py new file mode 100644 index 000000000000..488437bd9a5c --- /dev/null +++ b/hydro_cli_examples/examples/topolotree/random_insertions.py @@ -0,0 +1,61 @@ +import random +import numpy as np + + +class Node: + def __init__(self, data): + self.data = data + self.left = None + self.right = None + + +def insert_random_leaf(root, data): + if random.choice([True, False]): + if root.left is None: + root.left = Node(data) + else: + insert_random_leaf(root.left, data) + else: + if root.right is None: + root.right = Node(data) + else: + insert_random_leaf(root.right, data) + + +def create_binary_tree(data_list): + root = Node(data_list[0]) + for data in data_list[1:]: + insert_random_leaf(root, data) + return root + + +def find_max_length(root): + + if not root: + return 0 + + def helper(root): + + if not root: + return 0 + + if root.left and root.right: + return max(1 + helper(root.left), 1 + helper(root.right)) + if root.left: + return 1 + helper(root.left) + if root.right: + return 1 + helper(root.right) + if not root.left and not root.right: + return 0 + + return 2 + helper(root.left) + helper(root.right) + + +lengths = [] + +for _ in range(100): + data_list = np.arange(10000) + binary_tree = create_binary_tree(data_list) + lengths.append(find_max_length(binary_tree)) + +print(sum(lengths) / len(lengths)) From 39365677e2e86da7c1b416e90d6190d18466999a Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Mon, 11 Sep 2023 10:45:44 -0700 Subject: [PATCH 02/14] Added new hydro deploy code. Only broadcasting is possible so far. Infinite loop is present --- Cargo.lock | 16 +++++ Cargo.toml | 1 + topolotree/Cargo.toml | 22 ++++++ topolotree/src/main.rs | 153 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 topolotree/Cargo.toml create mode 100644 topolotree/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 805aeac4a5a7..9fb0795c418a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3148,6 +3148,22 @@ dependencies = [ "winnow", ] +[[package]] +name = "topolotree" +version = "0.0.0" +dependencies = [ + "dashmap", + "futures", + "hydroflow", + "hydroflow_datalog", + "procinfo", + "rand 0.8.5", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite", +] + [[package]] name = "tracing" version = "0.1.37" diff --git a/Cargo.toml b/Cargo.toml index 7bc13064ba02..a5b428dd3281 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "multiplatform_test", "pusherator", "relalg", + "topolotree", "variadics", "website_playground", ] diff --git a/topolotree/Cargo.toml b/topolotree/Cargo.toml new file mode 100644 index 000000000000..d2df597d8778 --- /dev/null +++ b/topolotree/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "topolotree" +publish = false +version = "0.0.0" +edition = "2021" + +[dependencies] +hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] } +hydroflow_datalog = { path = "../hydroflow_datalog" } + +tokio = { version = "1.16", features = [ "full" ] } +serde = { version = "1", features = ["rc"] } +serde_json = "1" +rand = "0.8.5" +dashmap = "5.4.0" + +futures = "0.3.28" + +tokio-tungstenite = "0.19.0" + +[target.'cfg(target_os = "linux")'.dependencies] +procinfo = "0.4.2" diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs new file mode 100644 index 000000000000..9caafcf3efc1 --- /dev/null +++ b/topolotree/src/main.rs @@ -0,0 +1,153 @@ +use std::io; +use futures::Stream; +use futures::Sink; +use hydroflow::bytes::{BytesMut, Bytes}; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::cli::{ConnectedDirect, ConnectedDemux, ConnectedTagged, ConnectedSink, ConnectedSource}; +use hydroflow::hydroflow_syntax; +use hydroflow::util::collect_ready; +use hydroflow::util::collect_ready_async; +use serde::Deserialize; +use serde::Serialize; +use std::fmt::Debug; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +struct Payload { + timestamp: usize, + data: usize +} + + +fn run_topolotree + Unpin + 'static>(neighbors: Vec, input_recv: impl Stream> + Unpin + 'static, output_send: S ) -> Hydroflow +where >::Error: Debug{ + + + fn merge(x: &mut usize, y: usize) { + *x += y; + } + + hydroflow_syntax! { + input = union() -> tee(); + + source_stream(input_recv) + -> map(Result::unwrap) + -> map(|(src, payload): (u32, BytesMut)| (src, serde_json::from_slice::(&payload[..]).unwrap())) + -> inspect(|(src, payload)| println!("received from: {src}: payload: {payload:?}")) + -> input; + + // + // Test simulation, eventually this should come from the python script. + // + // source_iter([(1, 15)]) + // -> filter(|(src, _)| *src == current_id) + // -> input; + + // source_iter([(2, 45)]) + // -> filter(|(src, _)| *src == current_id) + // -> input; + + input + -> map(|(src, payload)| (src, (payload, context.current_tick()))) + -> inspect(|x| eprintln!("input: {:?}", x)) + -> all_neighbor_data; + + last_neighbor_data -> inspect(|x| eprintln!("{:?}", x)) -> all_neighbor_data; + + // Stream of (src, (payload, tick)) + // First element compared to all of the other elements per src + all_neighbor_data = union() + -> reduce_keyed(|acc:&mut (Payload, usize), val:(Payload, usize)| { + if val.0.timestamp > acc.0.timestamp { + *acc = val; + } + }) + -> tee(); + + last_neighbor_data = all_neighbor_data -> defer_tick(); + + // Cross Join + neighbors = source_iter(neighbors) -> persist(); + all_neighbor_data -> [0]aggregated_data; + neighbors -> [1]aggregated_data; + + // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function + aggregated_data = cross_join_multiset() + -> filter(|((src, (payload, tick)), dst)| src != dst) + -> map(|((src, (payload, tick)), dst)| (dst, (src, (payload, tick)))) + -> fold_keyed(|| Payload{timestamp:0, data:0}, |acc: &mut Payload, (src, (payload, tick)): (u32, (Payload, usize))| { + merge(&mut acc.data, payload.data); + if tick > acc.timestamp { + acc.timestamp = tick; + } + }) + -> filter(|(dst, payload)| payload.timestamp == context.current_tick()); + + + aggregated_data + -> map(|(dst, payload)| (dst, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()).freeze())) + -> dest_sink(output_send); + + } + +} + + + + + +#[hydroflow::main] +async fn main() { + let args: Vec = std::env::args().skip(1).collect(); + let neighbors: Vec = args.into_iter().map(|x| x.parse().unwrap()).collect(); + // let current_id = neighbors[0]; + + let mut ports = hydroflow::util::cli::init().await; + + let input_recv = ports + .port("input") + // connect to the port with a single recipient + .connect::>() + .await + .into_source(); + + let output_send = ports + .port("output") + .connect::>() + .await + .into_sink(); + + hydroflow::util::cli::launch_flow(run_topolotree(neighbors, input_recv, output_send)).await; +} + + +#[hydroflow::test] +async fn simple_test() { + // let args: Vec = std::env::args().skip(1).collect(); + let neighbors: Vec = vec![1,2]; //args.into_iter().map(|x| x.parse().unwrap()).collect(); + // let current_id = neighbors[0]; + + let (input_send, input_recv) = hydroflow::util::unbounded_channel::>(); + let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); + + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + + let simulate_input = |(id, payload): (u32, Payload)| { + input_send.send(Ok((id, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))) + }; + + let mut receive_all_output = || async move { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected.iter().map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())).collect::>() + }; + + simulate_input((1, Payload { + timestamp: 1, + data: 2 + })).unwrap(); + + flow.run_tick(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + + assert_eq!(receive_all_output().await, &[]); +} From 7f7f0828a554fbf521284d98be4dcb6c75f5ea4a Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla <90572100+Saikrishna-Achalla@users.noreply.github.com> Date: Mon, 11 Sep 2023 10:47:02 -0700 Subject: [PATCH 03/14] Delete hydro_cli_examples/examples/topolotree/random_insertions.py --- .../examples/topolotree/random_insertions.py | 61 ------------------- 1 file changed, 61 deletions(-) delete mode 100644 hydro_cli_examples/examples/topolotree/random_insertions.py diff --git a/hydro_cli_examples/examples/topolotree/random_insertions.py b/hydro_cli_examples/examples/topolotree/random_insertions.py deleted file mode 100644 index 488437bd9a5c..000000000000 --- a/hydro_cli_examples/examples/topolotree/random_insertions.py +++ /dev/null @@ -1,61 +0,0 @@ -import random -import numpy as np - - -class Node: - def __init__(self, data): - self.data = data - self.left = None - self.right = None - - -def insert_random_leaf(root, data): - if random.choice([True, False]): - if root.left is None: - root.left = Node(data) - else: - insert_random_leaf(root.left, data) - else: - if root.right is None: - root.right = Node(data) - else: - insert_random_leaf(root.right, data) - - -def create_binary_tree(data_list): - root = Node(data_list[0]) - for data in data_list[1:]: - insert_random_leaf(root, data) - return root - - -def find_max_length(root): - - if not root: - return 0 - - def helper(root): - - if not root: - return 0 - - if root.left and root.right: - return max(1 + helper(root.left), 1 + helper(root.right)) - if root.left: - return 1 + helper(root.left) - if root.right: - return 1 + helper(root.right) - if not root.left and not root.right: - return 0 - - return 2 + helper(root.left) + helper(root.right) - - -lengths = [] - -for _ in range(100): - data_list = np.arange(10000) - binary_tree = create_binary_tree(data_list) - lengths.append(find_max_length(binary_tree)) - -print(sum(lengths) / len(lengths)) From fb979d9c4caa9194cc9195cd1e21b7269e4d56fd Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Mon, 11 Sep 2023 14:11:27 -0700 Subject: [PATCH 04/14] Removed infinite loop --- topolotree/src/main.rs | 37 ++++++------------------------------- 1 file changed, 6 insertions(+), 31 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 9caafcf3efc1..f7b7ed5b8420 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -35,36 +35,18 @@ where >::Error: Debug{ -> inspect(|(src, payload)| println!("received from: {src}: payload: {payload:?}")) -> input; - // - // Test simulation, eventually this should come from the python script. - // - // source_iter([(1, 15)]) - // -> filter(|(src, _)| *src == current_id) - // -> input; - - // source_iter([(2, 45)]) - // -> filter(|(src, _)| *src == current_id) - // -> input; - input -> map(|(src, payload)| (src, (payload, context.current_tick()))) -> inspect(|x| eprintln!("input: {:?}", x)) -> all_neighbor_data; - last_neighbor_data -> inspect(|x| eprintln!("{:?}", x)) -> all_neighbor_data; - - // Stream of (src, (payload, tick)) - // First element compared to all of the other elements per src - all_neighbor_data = union() - -> reduce_keyed(|acc:&mut (Payload, usize), val:(Payload, usize)| { + all_neighbor_data = reduce_keyed(|acc:&mut (Payload, usize), val:(Payload, usize)| { if val.0.timestamp > acc.0.timestamp { *acc = val; } - }) - -> tee(); - - last_neighbor_data = all_neighbor_data -> defer_tick(); - + }) + -> persist(); + // Cross Join neighbors = source_iter(neighbors) -> persist(); all_neighbor_data -> [0]aggregated_data; @@ -73,14 +55,7 @@ where >::Error: Debug{ // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function aggregated_data = cross_join_multiset() -> filter(|((src, (payload, tick)), dst)| src != dst) - -> map(|((src, (payload, tick)), dst)| (dst, (src, (payload, tick)))) - -> fold_keyed(|| Payload{timestamp:0, data:0}, |acc: &mut Payload, (src, (payload, tick)): (u32, (Payload, usize))| { - merge(&mut acc.data, payload.data); - if tick > acc.timestamp { - acc.timestamp = tick; - } - }) - -> filter(|(dst, payload)| payload.timestamp == context.current_tick()); + -> map(|((src, (payload, tick)), dst)| (dst, payload)); aggregated_data @@ -149,5 +124,5 @@ async fn simple_test() { tokio::time::sleep(std::time::Duration::from_secs(1)).await; - assert_eq!(receive_all_output().await, &[]); + assert_eq!(receive_all_output().await, &[(2, Payload {timestamp:1, data:2})]); } From 7004af9a17f80bea7aad0352a1411f30c3d5512f Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Mon, 11 Sep 2023 14:14:52 -0700 Subject: [PATCH 05/14] Fixed fomatting --- topolotree/src/main.rs | 102 ++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index f7b7ed5b8420..add776b5a4b1 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -1,27 +1,30 @@ +use std::fmt::Debug; use std::io; -use futures::Stream; -use futures::Sink; -use hydroflow::bytes::{BytesMut, Bytes}; -use hydroflow::scheduled::graph::Hydroflow; -use hydroflow::util::cli::{ConnectedDirect, ConnectedDemux, ConnectedTagged, ConnectedSink, ConnectedSource}; + +use futures::{Sink, Stream}; +use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::hydroflow_syntax; -use hydroflow::util::collect_ready; -use hydroflow::util::collect_ready_async; -use serde::Deserialize; -use serde::Serialize; -use std::fmt::Debug; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::cli::{ + ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, +}; +use hydroflow::util::{collect_ready, collect_ready_async}; +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] struct Payload { timestamp: usize, - data: usize -} - - -fn run_topolotree + Unpin + 'static>(neighbors: Vec, input_recv: impl Stream> + Unpin + 'static, output_send: S ) -> Hydroflow -where >::Error: Debug{ - + data: usize, +} +fn run_topolotree + Unpin + 'static>( + neighbors: Vec, + input_recv: impl Stream> + Unpin + 'static, + output_send: S, +) -> Hydroflow +where + >::Error: Debug, +{ fn merge(x: &mut usize, y: usize) { *x += y; } @@ -35,41 +38,36 @@ where >::Error: Debug{ -> inspect(|(src, payload)| println!("received from: {src}: payload: {payload:?}")) -> input; - input + input -> map(|(src, payload)| (src, (payload, context.current_tick()))) -> inspect(|x| eprintln!("input: {:?}", x)) -> all_neighbor_data; - + all_neighbor_data = reduce_keyed(|acc:&mut (Payload, usize), val:(Payload, usize)| { if val.0.timestamp > acc.0.timestamp { *acc = val; } }) -> persist(); - + // Cross Join neighbors = source_iter(neighbors) -> persist(); all_neighbor_data -> [0]aggregated_data; neighbors -> [1]aggregated_data; - + // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function aggregated_data = cross_join_multiset() -> filter(|((src, (payload, tick)), dst)| src != dst) -> map(|((src, (payload, tick)), dst)| (dst, payload)); - - - aggregated_data + + + aggregated_data -> map(|(dst, payload)| (dst, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()).freeze())) -> dest_sink(output_send); } - } - - - - #[hydroflow::main] async fn main() { let args: Vec = std::env::args().skip(1).collect(); @@ -81,48 +79,66 @@ async fn main() { let input_recv = ports .port("input") // connect to the port with a single recipient - .connect::>() + .connect::>() .await .into_source(); let output_send = ports .port("output") - .connect::>() + .connect::>() .await .into_sink(); hydroflow::util::cli::launch_flow(run_topolotree(neighbors, input_recv, output_send)).await; } - #[hydroflow::test] async fn simple_test() { // let args: Vec = std::env::args().skip(1).collect(); - let neighbors: Vec = vec![1,2]; //args.into_iter().map(|x| x.parse().unwrap()).collect(); - // let current_id = neighbors[0]; + let neighbors: Vec = vec![1, 2]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); + // let current_id = neighbors[0]; - let (input_send, input_recv) = hydroflow::util::unbounded_channel::>(); + let (input_send, input_recv) = + hydroflow::util::unbounded_channel::>(); let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); let simulate_input = |(id, payload): (u32, Payload)| { - input_send.send(Ok((id, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))) + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) }; let mut receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; - collected.iter().map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())).collect::>() + collected + .iter() + .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) + .collect::>() }; - simulate_input((1, Payload { - timestamp: 1, - data: 2 - })).unwrap(); + simulate_input(( + 1, + Payload { + timestamp: 1, + data: 2, + }, + )) + .unwrap(); flow.run_tick(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; - - assert_eq!(receive_all_output().await, &[(2, Payload {timestamp:1, data:2})]); + assert_eq!( + receive_all_output().await, + &[( + 2, + Payload { + timestamp: 1, + data: 2 + } + )] + ); } From 0451708aac784ca8a0409e8d773fad2b3eb66afb Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Wed, 13 Sep 2023 13:46:18 -0700 Subject: [PATCH 06/14] Simple Test working --- topolotree/src/main.rs | 62 +++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index add776b5a4b1..6a6a48b10e9f 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use std::io; -use futures::{Sink, Stream}; +use futures::{Sink, Stream, stream, StreamExt}; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; @@ -11,7 +11,7 @@ use hydroflow::util::cli::{ use hydroflow::util::{collect_ready, collect_ready_async}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Copy)] struct Payload { timestamp: usize, data: usize, @@ -43,12 +43,13 @@ where -> inspect(|x| eprintln!("input: {:?}", x)) -> all_neighbor_data; - all_neighbor_data = reduce_keyed(|acc:&mut (Payload, usize), val:(Payload, usize)| { + all_neighbor_data = reduce_keyed::<'static>(|acc:&mut (Payload, usize), val:(Payload, usize)| { if val.0.timestamp > acc.0.timestamp { *acc = val; } }) - -> persist(); + -> inspect(|(src, payload)| println!("data from stream: {src}: payload: {payload:?}")); + // -> persist(); // Cross Join neighbors = source_iter(neighbors) -> persist(); @@ -89,29 +90,40 @@ async fn main() { .await .into_sink(); + let increment_requests = ports + .port("increment_requests") + .connect::() + .await + .into_source(); + + let query_responses = ports + .port("query_responses") + .connect::() + .await + .into_sink(); + hydroflow::util::cli::launch_flow(run_topolotree(neighbors, input_recv, output_send)).await; } #[hydroflow::test] async fn simple_test() { // let args: Vec = std::env::args().skip(1).collect(); - let neighbors: Vec = vec![1, 2]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); + let neighbors: Vec = vec![1, 2, 3]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); // let current_id = neighbors[0]; - let (input_send, input_recv) = - hydroflow::util::unbounded_channel::>(); + // let (input_send, input_recv) = + // hydroflow::util::unbounded_channel::>(); let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); - let simulate_input = |(id, payload): (u32, Payload)| { - input_send.send(Ok(( - id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), - ))) - }; + let input1 = (1, Payload {timestamp:1, data:2}); + let input2 = (1, Payload {timestamp:1, data:3}); + let payload_vec = vec![input1, input2]; + let payload_stream = stream::iter(payload_vec).map(|(i, payload)| Ok((i, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))); + + let mut flow: Hydroflow = run_topolotree(neighbors, payload_stream, output_send); - let mut receive_all_output = || async move { + let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; collected .iter() @@ -119,26 +131,14 @@ async fn simple_test() { .collect::>() }; - simulate_input(( - 1, - Payload { - timestamp: 1, - data: 2, - }, - )) - .unwrap(); - flow.run_tick(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); + let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); + assert_eq!( receive_all_output().await, - &[( - 2, - Payload { - timestamp: 1, - data: 2 - } - )] + &[output1, output2] ); } From f6d7e0b3be42a83780e1ab884194173fc15b728a Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Wed, 13 Sep 2023 13:58:32 -0700 Subject: [PATCH 07/14] Simple Test working --- topolotree/src/main.rs | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 6a6a48b10e9f..66ca153fed50 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -11,11 +11,15 @@ use hydroflow::util::cli::{ use hydroflow::util::{collect_ready, collect_ready_async}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Copy)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] struct Payload { timestamp: usize, data: usize, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +struct OperationPayload { + change: usize +} fn run_topolotree + Unpin + 'static>( neighbors: Vec, @@ -106,22 +110,29 @@ async fn main() { } #[hydroflow::test] -async fn simple_test() { +async fn simple_payload__test() { // let args: Vec = std::env::args().skip(1).collect(); let neighbors: Vec = vec![1, 2, 3]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); // let current_id = neighbors[0]; - // let (input_send, input_recv) = - // hydroflow::util::unbounded_channel::>(); + let (input_send, input_recv) = + hydroflow::util::unbounded_channel::>(); let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); let input1 = (1, Payload {timestamp:1, data:2}); - let input2 = (1, Payload {timestamp:1, data:3}); - let payload_vec = vec![input1, input2]; - let payload_stream = stream::iter(payload_vec).map(|(i, payload)| Ok((i, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))); + // let input2 = (1, Payload {timestamp:1, data:3}); + // let payload_vec = vec![input1, input2]; + // let payload_stream = stream::iter(payload_vec).map(|(i, payload)| Ok((i, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))); + + let simulate_input = |(id, payload): (u32, Payload)| { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) + }; - let mut flow: Hydroflow = run_topolotree(neighbors, payload_stream, output_send); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; @@ -131,6 +142,8 @@ async fn simple_test() { .collect::>() }; + simulate_input(input1).unwrap(); + flow.run_tick(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; From 6b1f8e08282f845154133dc07c1d62f4b8748201 Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Thu, 14 Sep 2023 10:49:41 -0700 Subject: [PATCH 08/14] Added local update logic but types don't match. Also need to have time as a local variable and increment it every time an update is sent --- topolotree/src/main.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 66ca153fed50..5d9cc4e2a915 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -24,6 +24,7 @@ struct OperationPayload { fn run_topolotree + Unpin + 'static>( neighbors: Vec, input_recv: impl Stream> + Unpin + 'static, + local_update_recv: impl Stream> + Unpin + 'static, output_send: S, ) -> Hydroflow where @@ -42,6 +43,13 @@ where -> inspect(|(src, payload)| println!("received from: {src}: payload: {payload:?}")) -> input; + source_stream(local_update_recv) + -> map(Result::unwrap) + -> map(|changePayload: BytesMut| (serde_json::from_slice::(&changePayload[..]).unwrap())) + -> inspect(|changePayload| println!("change: {changePayload:?}")) + // -> tee(); + -> operations_input; + input -> map(|(src, payload)| (src, (payload, context.current_tick()))) -> inspect(|x| eprintln!("input: {:?}", x)) @@ -55,9 +63,13 @@ where -> inspect(|(src, payload)| println!("data from stream: {src}: payload: {payload:?}")); // -> persist(); + all_neighbor_data -> neighbors_and_myself; + operations_input -> fold::<'static>(0, |agg: &mut i64, op: i64| *agg += op) -> map(|total| (my_id, total)) -> neighbors_and_myself; + neighbors_and_myself = union(); + // Cross Join neighbors = source_iter(neighbors) -> persist(); - all_neighbor_data -> [0]aggregated_data; + neighbors_and_myself -> [0]aggregated_data; neighbors -> [1]aggregated_data; // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function @@ -94,6 +106,13 @@ async fn main() { .await .into_sink(); + let operations_send = ports + .port("input") + // connect to the port with a single recipient + .connect::>() + .await + .into_source(); + let increment_requests = ports .port("increment_requests") .connect::() @@ -106,7 +125,7 @@ async fn main() { .await .into_sink(); - hydroflow::util::cli::launch_flow(run_topolotree(neighbors, input_recv, output_send)).await; + hydroflow::util::cli::launch_flow(run_topolotree(neighbors, input_recv, operations_send, output_send)).await; } #[hydroflow::test] @@ -125,6 +144,7 @@ async fn simple_payload__test() { // let payload_vec = vec![input1, input2]; // let payload_stream = stream::iter(payload_vec).map(|(i, payload)| Ok((i, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))); + // Send (id, Payload) over network to neighbors let simulate_input = |(id, payload): (u32, Payload)| { input_send.send(Ok(( id, @@ -132,7 +152,7 @@ async fn simple_payload__test() { ))) }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_send, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; From ffac0f32d596a6bbf6209514f68a15058974594c Mon Sep 17 00:00:00 2001 From: Saikrishna Achalla Date: Thu, 14 Sep 2023 11:03:53 -0700 Subject: [PATCH 09/14] Conor tests combined --- topolotree/src/main.rs | 135 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 124 insertions(+), 11 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 5d9cc4e2a915..10edc7036647 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -128,32 +128,28 @@ async fn main() { hydroflow::util::cli::launch_flow(run_topolotree(neighbors, input_recv, operations_send, output_send)).await; } + + + #[hydroflow::test] async fn simple_payload__test() { // let args: Vec = std::env::args().skip(1).collect(); let neighbors: Vec = vec![1, 2, 3]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); // let current_id = neighbors[0]; - let (input_send, input_recv) = hydroflow::util::unbounded_channel::>(); let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); - - let input1 = (1, Payload {timestamp:1, data:2}); // let input2 = (1, Payload {timestamp:1, data:3}); // let payload_vec = vec![input1, input2]; // let payload_stream = stream::iter(payload_vec).map(|(i, payload)| Ok((i, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))); - - // Send (id, Payload) over network to neighbors let simulate_input = |(id, payload): (u32, Payload)| { input_send.send(Ok(( id, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), ))) }; - - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_send, output_send); - + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; collected @@ -161,17 +157,134 @@ async fn simple_payload__test() { .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) .collect::>() }; - simulate_input(input1).unwrap(); - flow.run_tick(); tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); + assert_eq!( + receive_all_output().await, + &[output1, output2] + ); +} + + +#[hydroflow::test] +async fn idempotence_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (input_send, input_recv) = + hydroflow::util::unbounded_channel::>(); + let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); + let input1 = (1, Payload {timestamp:4, data:2}); + let input2 = (1, Payload {timestamp:4, data:2}); + let simulate_input = |(id, payload): (u32, Payload)| { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) + }; + simulate_input(input1).unwrap(); + simulate_input(input2).unwrap(); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let receive_all_output = || async move { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) + .collect::>() + }; + flow.run_tick(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); + let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); assert_eq!( receive_all_output().await, &[output1, output2] ); } +#[hydroflow::test] +async fn backwards_in_time_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (input_send, input_recv) = + hydroflow::util::unbounded_channel::>(); + let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); + let input1 = (1, Payload {timestamp:5, data:7}); + let input2 = (1, Payload {timestamp:4, data:2}); + let simulate_input = |(id, payload): (u32, Payload)| { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) + }; + simulate_input(input1).unwrap(); + simulate_input(input2).unwrap(); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let receive_all_output = || async move { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) + .collect::>() + }; + flow.run_tick(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7}); + let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7}); + assert_eq!( + receive_all_output().await, + &[output1, output2] + ); +} +#[hydroflow::test] +async fn multiple_input_sources_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (input_send, input_recv) = + hydroflow::util::unbounded_channel::>(); + let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); + let input1 = (1, Payload {timestamp:5, data:7}); + let input2 = (2, Payload {timestamp:4, data:2}); + let simulate_input = |(id, payload): (u32, Payload)| { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) + }; + simulate_input(input1).unwrap(); + simulate_input(input2).unwrap(); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let receive_all_output = || async move { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) + .collect::>() + }; + flow.run_tick(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2}); + let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7}); + let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9}); + assert_eq!( + receive_all_output().await, + HashSet::from([output1, output2, output3]) + ); +} +// idempotence test (issue two requests with the same timestamp and see that they don't change anything.) +// let input1 = (1, Payload {timestamp:4, data:2}); +// let input2 = (1, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); +// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); +// +// backward in time test (issue two requests, the second one with an earlier timestamp than the first. ) +// let input1 = (1, Payload {timestamp:5, data:7}); +// let input2 = (1, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7}); +// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7}); +// +// updates from multiple sources test +// let input1 = (1, Payload {timestamp:5, data:7}); +// let input2 = (2, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2}); +// let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7}); +// let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9}); From 3cd429d54d539362982005af578ca4ffad5e5026 Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:25:08 -0700 Subject: [PATCH 10/14] looks pretty good --- topolotree/src/main.rs | 542 ++++++++++++++++++++++++++++++++--------- 1 file changed, 428 insertions(+), 114 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 10edc7036647..e6bb86df38ed 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -1,86 +1,216 @@ +use std::cell::RefCell; +use std::collections::HashSet; use std::fmt::Debug; use std::io; +use std::rc::Rc; -use futures::{Sink, Stream, stream, StreamExt}; +use futures::{Sink, SinkExt, Stream}; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::util::cli::{ ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, }; -use hydroflow::util::{collect_ready, collect_ready_async}; +use hydroflow::util::multiset::HashMultiSet; +use hydroflow::util::{collect_ready_async, unbounded_channel}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -struct Payload { - timestamp: usize, - data: usize, +#[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)] +struct Payload { + timestamp: isize, + data: T, +} + +impl Payload { + pub fn merge_from(&mut self, other: Payload) -> bool { + if other.timestamp > self.timestamp { + self.data = other.data; + self.timestamp = other.timestamp; + true + } else { + false + } + } + + pub fn update(&mut self, updater: impl Fn(&T) -> T) { + self.data = updater(&self.data); + self.timestamp += 1; + } +} + +impl PartialEq for Payload { + fn eq(&self, other: &Self) -> bool { + if self.timestamp == other.timestamp { + assert_eq!(self.data, other.data); + true + } else { + false + } + } } + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] struct OperationPayload { - change: usize + change: i64, } -fn run_topolotree + Unpin + 'static>( +fn run_topolotree( neighbors: Vec, input_recv: impl Stream> + Unpin + 'static, local_update_recv: impl Stream> + Unpin + 'static, - output_send: S, -) -> Hydroflow -where - >::Error: Debug, -{ - fn merge(x: &mut usize, y: usize) { + output_send: tokio::sync::mpsc::UnboundedSender<(u32, Bytes)>, +) -> Hydroflow { + fn merge(x: &mut i64, y: i64) { *x += y; } - hydroflow_syntax! { - input = union() -> tee(); + let self_timestamp = Rc::new(RefCell::new(0)); - source_stream(input_recv) - -> map(Result::unwrap) - -> map(|(src, payload): (u32, BytesMut)| (src, serde_json::from_slice::(&payload[..]).unwrap())) - -> inspect(|(src, payload)| println!("received from: {src}: payload: {payload:?}")) - -> input; + let self_timestamp1 = Rc::clone(&self_timestamp); + let self_timestamp2 = Rc::clone(&self_timestamp); + let self_timestamp3 = Rc::clone(&self_timestamp); - source_stream(local_update_recv) + hydroflow_syntax! { + from_neighbors = source_stream(input_recv) -> map(Result::unwrap) - -> map(|changePayload: BytesMut| (serde_json::from_slice::(&changePayload[..]).unwrap())) - -> inspect(|changePayload| println!("change: {changePayload:?}")) - // -> tee(); - -> operations_input; - - input - -> map(|(src, payload)| (src, (payload, context.current_tick()))) - -> inspect(|x| eprintln!("input: {:?}", x)) - -> all_neighbor_data; - - all_neighbor_data = reduce_keyed::<'static>(|acc:&mut (Payload, usize), val:(Payload, usize)| { - if val.0.timestamp > acc.0.timestamp { + -> map(|(src, payload)| (src, serde_json::from_slice(&payload[..]).unwrap())) + -> inspect(|(src, payload): &(u32, Payload)| println!("received from: {src}: payload: {payload:?}")) + -> tee(); + + from_neighbors + -> persist() + -> fold_keyed(|| Payload { timestamp: -1, data: Default::default() }, |acc: &mut Payload, val: Payload| { + if val.timestamp > acc.timestamp { *acc = val; + *self_timestamp1.borrow_mut() += 1; } }) - -> inspect(|(src, payload)| println!("data from stream: {src}: payload: {payload:?}")); - // -> persist(); + -> inspect(|(src, data)| println!("data from stream: {src}: data: {data:?}")) + -> [0]all_neighbor_data; - all_neighbor_data -> neighbors_and_myself; - operations_input -> fold::<'static>(0, |agg: &mut i64, op: i64| *agg += op) -> map(|total| (my_id, total)) -> neighbors_and_myself; - neighbors_and_myself = union(); + local_value = source_stream(local_update_recv) + -> map(Result::unwrap) + -> map(|change_payload: BytesMut| (serde_json::from_slice(&change_payload[..]).unwrap())) + -> inspect(|change_payload: &OperationPayload| println!("change: {change_payload:?}")) + -> inspect(|_| { + *self_timestamp2.borrow_mut() += 1; + }) + -> persist() + -> fold(0, |agg: &mut i64, op: OperationPayload| *agg += op.change); + + neighbors = source_iter(neighbors) + -> persist() + -> tee(); + + // [1, 2, 3] + SelfState + // message comes in from 2 + // (2+3+SelfState) -> 1, (1+2+SelfState) -> 3 + + + from_neighbors // 2 comes out here + -> map(|(src, _payload)| src) + -> [0]all_other_neighbors_except_for_who_it_came_from; // 2 goes into this crossjoin + + neighbors + -> [1]all_other_neighbors_except_for_who_it_came_from; - // Cross Join - neighbors = source_iter(neighbors) -> persist(); - neighbors_and_myself -> [0]aggregated_data; - neighbors -> [1]aggregated_data; + // (2, 1), (2, 2), (2, 3) + all_other_neighbors_except_for_who_it_came_from = cross_join_multiset() + -> filter(|(src, neighbor)| { + src != neighbor + }) + -> [0]who_to_aggregate_from_by_target; // (2, 1), (2, 3) + + neighbors + -> [1]who_to_aggregate_from_by_target; - // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function - aggregated_data = cross_join_multiset() - -> filter(|((src, (payload, tick)), dst)| src != dst) - -> map(|((src, (payload, tick)), dst)| (dst, payload)); + // ((2, 1), 1)), ((2, 1), 2)), ((2, 1), 3)), + // ((2, 3), 1)), ((2, 3), 2)), ((2, 3), 3)), + who_to_aggregate_from_by_target = cross_join_multiset() + -> filter(|((_original_src, target_neighbor), aggregate_from_this_guy)| { + target_neighbor != aggregate_from_this_guy + }) + // ((2, 1), 2)), ((2, 1), 3)), + // ((2, 3), 1)), ((2, 3), 2)), + -> map(|((original_src, target_neighbor), aggregate_from_this_guy)| { + (aggregate_from_this_guy, (original_src, target_neighbor)) + }) + // (2, (2, 1))), (3, (2, 1))), + // (1, (2, 3))), (2, (2, 3))), + -> [1]all_neighbor_data; - aggregated_data - -> map(|(dst, payload)| (dst, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()).freeze())) - -> dest_sink(output_send); + + all_neighbor_data = join() + -> map(|(aggregate_from_this_guy, (payload, (original_src, target_neighbor)))| { + ((target_neighbor, original_src), (aggregate_from_this_guy, payload)) + }) + -> fold_keyed(|| 0, |acc: &mut i64, (_aggregate_from_this_guy, payload): (u32, Payload)| { + merge(acc, payload.data); + }) + -> [0]add_local_value; + + local_value + -> [1]add_local_value; + + add_local_value = cross_join_multiset() + -> map(|(((target_neighbor, _original_src), data), local_value)| { + (target_neighbor, Payload { + timestamp: *self_timestamp3.borrow(), + data: data + local_value + }) + }) + -> for_each(|(target_neighbor, output)| { + let serialized = BytesMut::from(serde_json::to_string(&output).unwrap().as_str()).freeze(); + output_send.send((target_neighbor, serialized)).unwrap(); + }); + + + + // src + // -> map(|(from, _data)| from) + // -> enumerate() + // -> [0]cj1; + + // source_iter(NEIGHBORS) + // -> persist() + // -> [1]cj1; + + // cj1 = cross_join::() + // -> filter(|((_req_id, from), to)| to != from) + // -> map(|((req_id, _from), to)| (to, req_id)) + // -> [0]cj2; + + // source_iter(NEIGHBORS) + // -> persist() + // -> [1]cj2; + + // cj2 = cross_join::() + // -> filter(|((to, _req_id), node_id)| node_id != to) + // -> map(|((to, req_id), node_id)| (node_id, (req_id, to))) + // -> [0]j; + + + + // all_neighbor_data -> neighbors_and_myself; + // operations_input -> fold::<'static>(0, |agg: &mut i64, op: i64| *agg += op) -> map(|total| (my_id, total)) -> neighbors_and_myself; + // neighbors_and_myself = union(); + + // // Cross Join + // neighbors = source_iter(neighbors) -> persist(); + // neighbors_and_myself -> [0]aggregated_data; + // neighbors -> [1]aggregated_data; + + // // (dest, Payload) where Payload has timestamp and accumulated data as specified by merge function + // aggregated_data = cross_join_multiset() + // -> filter(|((src, (payload, tick)), dst)| src != dst) + // -> map(|((src, (payload, tick)), dst)| (dst, payload)); + + // aggregated_data + // -> map(|(dst, payload)| (dst, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()).freeze())) + // -> for_each(|x| { + // output_send.send(x).unwrap(); + // }); } } @@ -100,7 +230,7 @@ async fn main() { .await .into_source(); - let output_send = ports + let mut output_send = ports .port("output") .connect::>() .await @@ -109,7 +239,7 @@ async fn main() { let operations_send = ports .port("input") // connect to the port with a single recipient - .connect::>() + .connect::() .await .into_source(); @@ -125,60 +255,102 @@ async fn main() { .await .into_sink(); - hydroflow::util::cli::launch_flow(run_topolotree(neighbors, input_recv, operations_send, output_send)).await; -} - + let (chan_tx, mut chan_rx) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_local(async move { + while let Some(msg) = chan_rx.recv().await { + output_send.send(msg).await.unwrap(); + } + }); + hydroflow::util::cli::launch_flow(run_topolotree( + neighbors, + input_recv, + operations_send, + chan_tx, + )) + .await; +} #[hydroflow::test] -async fn simple_payload__test() { +async fn simple_payload_test() { // let args: Vec = std::env::args().skip(1).collect(); let neighbors: Vec = vec![1, 2, 3]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); - // let current_id = neighbors[0]; - let (input_send, input_recv) = - hydroflow::util::unbounded_channel::>(); - let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); - let input1 = (1, Payload {timestamp:1, data:2}); + // let current_id = neighbors[0]; + + let (operations_tx, operations_rx) = unbounded_channel::>(); + let (input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let input1 = ( + 1, + Payload { + timestamp: 1, + data: 2, + }, + ); // let input2 = (1, Payload {timestamp:1, data:3}); // let payload_vec = vec![input1, input2]; // let payload_stream = stream::iter(payload_vec).map(|(i, payload)| Ok((i, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))); - let simulate_input = |(id, payload): (u32, Payload)| { + let simulate_input = |(id, payload): (u32, Payload)| { input_send.send(Ok(( id, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), ))) }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; collected .iter() - .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) .collect::>() }; simulate_input(input1).unwrap(); flow.run_tick(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); - let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); - assert_eq!( - receive_all_output().await, - &[output1, output2] + let output1: (u32, Payload) = ( + 2, + Payload { + timestamp: 1, + data: 2, + }, + ); + let output2: (u32, Payload) = ( + 3, + Payload { + timestamp: 1, + data: 2, + }, ); + assert_eq!(receive_all_output().await, &[output1, output2]); } - - #[hydroflow::test] async fn idempotence_test() { let neighbors: Vec = vec![1, 2, 3]; - let (input_send, input_recv) = - hydroflow::util::unbounded_channel::>(); - let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); - let input1 = (1, Payload {timestamp:4, data:2}); - let input2 = (1, Payload {timestamp:4, data:2}); - let simulate_input = |(id, payload): (u32, Payload)| { + let (operations_tx, operations_rx) = unbounded_channel::>(); + + let (input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let input1 = ( + 1, + Payload { + timestamp: 4, + data: 2, + }, + ); + let input2 = ( + 1, + Payload { + timestamp: 4, + data: 2, + }, + ); + let simulate_input = |(id, payload): (u32, Payload)| { input_send.send(Ok(( id, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), @@ -186,32 +358,59 @@ async fn idempotence_test() { }; simulate_input(input1).unwrap(); simulate_input(input2).unwrap(); - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; collected .iter() - .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) .collect::>() }; flow.run_tick(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); - let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); - assert_eq!( - receive_all_output().await, - &[output1, output2] + let output1: (u32, Payload) = ( + 2, + Payload { + timestamp: 1, + data: 2, + }, + ); + let output2: (u32, Payload) = ( + 3, + Payload { + timestamp: 1, + data: 2, + }, ); + assert_eq!(receive_all_output().await, &[output1, output2]); } + #[hydroflow::test] async fn backwards_in_time_test() { let neighbors: Vec = vec![1, 2, 3]; - let (input_send, input_recv) = - hydroflow::util::unbounded_channel::>(); - let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); - let input1 = (1, Payload {timestamp:5, data:7}); - let input2 = (1, Payload {timestamp:4, data:2}); - let simulate_input = |(id, payload): (u32, Payload)| { + + let (operations_tx, operations_rx) = unbounded_channel::>(); + let (input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let input1 = ( + 1, + Payload { + timestamp: 5, + data: 7, + }, + ); + let input2 = ( + 1, + Payload { + timestamp: 4, + data: 2, + }, + ); + let simulate_input = |(id, payload): (u32, Payload)| { input_send.send(Ok(( id, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), @@ -219,32 +418,59 @@ async fn backwards_in_time_test() { }; simulate_input(input1).unwrap(); simulate_input(input2).unwrap(); - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; collected .iter() - .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) .collect::>() }; flow.run_tick(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7}); - let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7}); - assert_eq!( - receive_all_output().await, - &[output1, output2] + let output1: (u32, Payload) = ( + 2, + Payload { + timestamp: 1, + data: 7, + }, ); + let output2: (u32, Payload) = ( + 3, + Payload { + timestamp: 1, + data: 7, + }, + ); + assert_eq!(receive_all_output().await, &[output1, output2]); } + #[hydroflow::test] async fn multiple_input_sources_test() { let neighbors: Vec = vec![1, 2, 3]; - let (input_send, input_recv) = - hydroflow::util::unbounded_channel::>(); - let (output_send, mut output_recv) = futures::channel::mpsc::unbounded::<(u32, Bytes)>(); - let input1 = (1, Payload {timestamp:5, data:7}); - let input2 = (2, Payload {timestamp:4, data:2}); - let simulate_input = |(id, payload): (u32, Payload)| { + let (operations_tx, operations_rx) = unbounded_channel::>(); + + let (input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let input1 = ( + 1, + Payload { + timestamp: 5, + data: 7, + }, + ); + let input2 = ( + 2, + Payload { + timestamp: 4, + data: 2, + }, + ); + let simulate_input = |(id, payload): (u32, Payload)| { input_send.send(Ok(( id, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), @@ -252,24 +478,112 @@ async fn multiple_input_sources_test() { }; simulate_input(input1).unwrap(); simulate_input(input2).unwrap(); - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, output_send); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; collected .iter() - .map(|(id, bytes)| (*id, serde_json::from_slice::(&bytes[..]).unwrap())) - .collect::>() + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) + .collect::>() }; flow.run_tick(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2}); - let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7}); - let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9}); + let output1 = ( + 1, + Payload { + timestamp: 2, + data: 2, + }, + ); + let output2 = ( + 2, + Payload { + timestamp: 2, + data: 7, + }, + ); + let output3 = ( + 3, + Payload { + timestamp: 2, + data: 9, + }, + ); assert_eq!( receive_all_output().await, - HashSet::from([output1, output2, output3]) + HashMultiSet::from_iter([output1, output2, output3.clone(), output3]) + ); + + // {(1, Payload { timestamp: 70, data: 2 }), (2, Payload { timestamp: 70, data: 7 }), (3, Payload { timestamp: 70, data: 18 })} + // {(2, Payload { timestamp: 2, data: 7 }), (1, Payload { timestamp: 2, data: 2 }), (3, Payload { timestamp: 2, data: 9 })} +} + +#[hydroflow::test] +async fn simple_operation_test() { + // let args: Vec = std::env::args().skip(1).collect(); + let neighbors: Vec = vec![1, 2, 3]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); + // let current_id = neighbors[0]; + + let (operations_tx, operations_rx) = unbounded_channel::>(); + let (input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + let input1 = ( + 1, + Payload { + timestamp: 1, + data: 2, + }, ); + + operations_tx + .send(Ok(BytesMut::from( + serde_json::to_string(&OperationPayload { change: 5 }) + .unwrap() + .as_str(), + ))) + .unwrap(); + + operations_tx + .send(Ok(BytesMut::from( + serde_json::to_string(&OperationPayload { change: 7 }) + .unwrap() + .as_str(), + ))) + .unwrap(); + + let simulate_input = |(id, payload): (u32, Payload)| { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) + }; + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + let receive_all_output = || async move { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) + .collect::>() + }; + simulate_input(input1).unwrap(); + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(receive_all_output().await, &[ + (2, Payload { timestamp: 3, data: 14 }), + (3, Payload { timestamp: 3, data: 14 }), + ]); } + // idempotence test (issue two requests with the same timestamp and see that they don't change anything.) // let input1 = (1, Payload {timestamp:4, data:2}); // let input2 = (1, Payload {timestamp:4, data:2}); From cf8769d2c6bb86913a76d7046c8e3cba6be81cc0 Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:44:12 -0700 Subject: [PATCH 11/14] refactoring --- topolotree/src/main.rs | 180 +++++++++++------------------------------ topolotree/src/util.rs | 18 +++++ 2 files changed, 66 insertions(+), 132 deletions(-) create mode 100644 topolotree/src/util.rs diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index e6bb86df38ed..e3060c421084 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -1,3 +1,5 @@ +mod util; + use std::cell::RefCell; use std::collections::HashSet; use std::fmt::Debug; @@ -14,30 +16,17 @@ use hydroflow::util::cli::{ use hydroflow::util::multiset::HashMultiSet; use hydroflow::util::{collect_ready_async, unbounded_channel}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedSender; + +use crate::util::simulate_input; #[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)] -struct Payload { +pub struct Payload { timestamp: isize, data: T, } -impl Payload { - pub fn merge_from(&mut self, other: Payload) -> bool { - if other.timestamp > self.timestamp { - self.data = other.data; - self.timestamp = other.timestamp; - true - } else { - false - } - } - - pub fn update(&mut self, updater: impl Fn(&T) -> T) { - self.data = updater(&self.data); - self.timestamp += 1; - } -} - impl PartialEq for Payload { fn eq(&self, other: &Self) -> bool { if self.timestamp == other.timestamp { @@ -64,6 +53,9 @@ fn run_topolotree( *x += y; } + // Timestamp stuff is a bit complicated, there is a proper data-flowy way to do it + // but it would require at least one more join and one more cross join just specifically for the local timestamps + // Until we need it to be proper then we can take a shortcut and use rc refcell let self_timestamp = Rc::new(RefCell::new(0)); let self_timestamp1 = Rc::clone(&self_timestamp); @@ -243,13 +235,13 @@ async fn main() { .await .into_source(); - let increment_requests = ports + let _increment_requests = ports .port("increment_requests") .connect::() .await .into_source(); - let query_responses = ports + let _query_responses = ports .port("query_responses") .connect::() .await @@ -274,29 +266,15 @@ async fn main() { #[hydroflow::test] async fn simple_payload_test() { - // let args: Vec = std::env::args().skip(1).collect(); - let neighbors: Vec = vec![1, 2, 3]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); - // let current_id = neighbors[0]; + let neighbors: Vec = vec![1, 2, 3]; - let (operations_tx, operations_rx) = unbounded_channel::>(); - let (input_send, input_recv) = unbounded_channel::>(); + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - let input1 = ( - 1, - Payload { - timestamp: 1, - data: 2, - }, - ); - // let input2 = (1, Payload {timestamp:1, data:3}); - // let payload_vec = vec![input1, input2]; - // let payload_stream = stream::iter(payload_vec).map(|(i, payload)| Ok((i, BytesMut::from(serde_json::to_string(&payload).unwrap().as_str())))); - let simulate_input = |(id, payload): (u32, Payload)| { - input_send.send(Ok(( - id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), - ))) - }; + + #[rustfmt::skip] + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; @@ -310,7 +288,6 @@ async fn simple_payload_test() { }) .collect::>() }; - simulate_input(input1).unwrap(); flow.run_tick(); let output1: (u32, Payload) = ( 2, @@ -332,32 +309,17 @@ async fn simple_payload_test() { #[hydroflow::test] async fn idempotence_test() { let neighbors: Vec = vec![1, 2, 3]; - let (operations_tx, operations_rx) = unbounded_channel::>(); + let (_operations_tx, operations_rx) = unbounded_channel::>(); - let (input_send, input_recv) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - let input1 = ( - 1, - Payload { - timestamp: 4, - data: 2, - }, - ); - let input2 = ( - 1, - Payload { - timestamp: 4, - data: 2, - }, - ); - let simulate_input = |(id, payload): (u32, Payload)| { - input_send.send(Ok(( - id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), - ))) + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); }; - simulate_input(input1).unwrap(); - simulate_input(input2).unwrap(); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; @@ -393,31 +355,16 @@ async fn idempotence_test() { async fn backwards_in_time_test() { let neighbors: Vec = vec![1, 2, 3]; - let (operations_tx, operations_rx) = unbounded_channel::>(); - let (input_send, input_recv) = unbounded_channel::>(); + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - let input1 = ( - 1, - Payload { - timestamp: 5, - data: 7, - }, - ); - let input2 = ( - 1, - Payload { - timestamp: 4, - data: 2, - }, - ); - let simulate_input = |(id, payload): (u32, Payload)| { - input_send.send(Ok(( - id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), - ))) + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); }; - simulate_input(input1).unwrap(); - simulate_input(input2).unwrap(); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; @@ -452,32 +399,17 @@ async fn backwards_in_time_test() { #[hydroflow::test] async fn multiple_input_sources_test() { let neighbors: Vec = vec![1, 2, 3]; - let (operations_tx, operations_rx) = unbounded_channel::>(); + let (_operations_tx, operations_rx) = unbounded_channel::>(); - let (input_send, input_recv) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - let input1 = ( - 1, - Payload { - timestamp: 5, - data: 7, - }, - ); - let input2 = ( - 2, - Payload { - timestamp: 4, - data: 2, - }, - ); - let simulate_input = |(id, payload): (u32, Payload)| { - input_send.send(Ok(( - id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), - ))) + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); + simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap(); }; - simulate_input(input1).unwrap(); - simulate_input(input2).unwrap(); + let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; @@ -517,27 +449,18 @@ async fn multiple_input_sources_test() { receive_all_output().await, HashMultiSet::from_iter([output1, output2, output3.clone(), output3]) ); - - // {(1, Payload { timestamp: 70, data: 2 }), (2, Payload { timestamp: 70, data: 7 }), (3, Payload { timestamp: 70, data: 18 })} - // {(2, Payload { timestamp: 2, data: 7 }), (1, Payload { timestamp: 2, data: 2 }), (3, Payload { timestamp: 2, data: 9 })} } #[hydroflow::test] async fn simple_operation_test() { - // let args: Vec = std::env::args().skip(1).collect(); - let neighbors: Vec = vec![1, 2, 3]; // args.into_iter().map(|x| x.parse().unwrap()).collect(); - // let current_id = neighbors[0]; + let neighbors: Vec = vec![1, 2, 3]; let (operations_tx, operations_rx) = unbounded_channel::>(); - let (input_send, input_recv) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - let input1 = ( - 1, - Payload { - timestamp: 1, - data: 2, - }, - ); + + #[rustfmt::skip] + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); operations_tx .send(Ok(BytesMut::from( @@ -555,12 +478,6 @@ async fn simple_operation_test() { ))) .unwrap(); - let simulate_input = |(id, payload): (u32, Payload)| { - input_send.send(Ok(( - id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), - ))) - }; let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); let receive_all_output = || async move { let collected = collect_ready_async::, _>(&mut output_recv).await; @@ -574,7 +491,6 @@ async fn simple_operation_test() { }) .collect::>() }; - simulate_input(input1).unwrap(); flow.run_tick(); #[rustfmt::skip] diff --git a/topolotree/src/util.rs b/topolotree/src/util.rs new file mode 100644 index 000000000000..4eb53f48d5c9 --- /dev/null +++ b/topolotree/src/util.rs @@ -0,0 +1,18 @@ +use std::fmt::Debug; + +use hydroflow::bytes::BytesMut; +use serde::Serialize; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedSender; + +use crate::Payload; + +pub fn simulate_input( + input_send: &mut UnboundedSender>, + (id, payload): (u32, Payload), +) -> Result<(), SendError>> { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) +} From 0b438564a85ba6667f2def7cbdd1632b5241a1ea Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:49:48 -0700 Subject: [PATCH 12/14] refactoring --- topolotree/src/main.rs | 108 ++++++++++++----------------------------- 1 file changed, 32 insertions(+), 76 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index e3060c421084..8a30abbab406 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -286,24 +286,15 @@ async fn simple_payload_test() { serde_json::from_slice::>(&bytes[..]).unwrap(), ) }) - .collect::>() + .collect::>() }; flow.run_tick(); - let output1: (u32, Payload) = ( - 2, - Payload { - timestamp: 1, - data: 2, - }, - ); - let output2: (u32, Payload) = ( - 3, - Payload { - timestamp: 1, - data: 2, - }, - ); - assert_eq!(receive_all_output().await, &[output1, output2]); + + #[rustfmt::skip] + assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 2 }), + (3, Payload { timestamp: 1, data: 2 }), + ])); } #[hydroflow::test] @@ -331,24 +322,15 @@ async fn idempotence_test() { serde_json::from_slice::>(&bytes[..]).unwrap(), ) }) - .collect::>() + .collect::>() }; flow.run_tick(); - let output1: (u32, Payload) = ( - 2, - Payload { - timestamp: 1, - data: 2, - }, - ); - let output2: (u32, Payload) = ( - 3, - Payload { - timestamp: 1, - data: 2, - }, - ); - assert_eq!(receive_all_output().await, &[output1, output2]); + + #[rustfmt::skip] + assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 2 }), + (3, Payload { timestamp: 1, data: 2 }), + ])); } #[hydroflow::test] @@ -376,24 +358,15 @@ async fn backwards_in_time_test() { serde_json::from_slice::>(&bytes[..]).unwrap(), ) }) - .collect::>() + .collect::>() }; flow.run_tick(); - let output1: (u32, Payload) = ( - 2, - Payload { - timestamp: 1, - data: 7, - }, - ); - let output2: (u32, Payload) = ( - 3, - Payload { - timestamp: 1, - data: 7, - }, - ); - assert_eq!(receive_all_output().await, &[output1, output2]); + + #[rustfmt::skip] + assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 7 }), + (3, Payload { timestamp: 1, data: 7 }), + ])); } #[hydroflow::test] @@ -424,31 +397,14 @@ async fn multiple_input_sources_test() { .collect::>() }; flow.run_tick(); - let output1 = ( - 1, - Payload { - timestamp: 2, - data: 2, - }, - ); - let output2 = ( - 2, - Payload { - timestamp: 2, - data: 7, - }, - ); - let output3 = ( - 3, - Payload { - timestamp: 2, - data: 9, - }, - ); - assert_eq!( - receive_all_output().await, - HashMultiSet::from_iter([output1, output2, output3.clone(), output3]) - ); + + #[rustfmt::skip] + assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + (1, Payload { timestamp: 2, data: 2 }), + (2, Payload { timestamp: 2, data: 7 }), + (3, Payload { timestamp: 2, data: 9 }), + (3, Payload { timestamp: 2, data: 9 }), + ])); } #[hydroflow::test] @@ -489,15 +445,15 @@ async fn simple_operation_test() { serde_json::from_slice::>(&bytes[..]).unwrap(), ) }) - .collect::>() + .collect::>() }; flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, &[ + assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ (2, Payload { timestamp: 3, data: 14 }), (3, Payload { timestamp: 3, data: 14 }), - ]); + ])); } // idempotence test (issue two requests with the same timestamp and see that they don't change anything.) From 13517536a7e92bd78190f1d2d721e17889bc1321 Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:16:29 -0700 Subject: [PATCH 13/14] refactoring --- topolotree/src/main.rs | 113 +++++++++-------------------------------- topolotree/src/util.rs | 31 ++++++++++- 2 files changed, 53 insertions(+), 91 deletions(-) diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 8a30abbab406..fd62a4f4a76c 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -10,6 +10,7 @@ use futures::{Sink, SinkExt, Stream}; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow::util::cli::{ ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, }; @@ -19,7 +20,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; -use crate::util::simulate_input; +use crate::util::{read_all, simulate_input, simulate_operation}; #[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)] pub struct Payload { @@ -39,7 +40,7 @@ impl PartialEq for Payload { } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -struct OperationPayload { +pub struct OperationPayload { change: i64, } @@ -275,23 +276,12 @@ async fn simple_payload_test() { #[rustfmt::skip] simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 1, data: 2 }), (3, Payload { timestamp: 1, data: 2 }), ])); @@ -311,23 +301,12 @@ async fn idempotence_test() { simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 1, data: 2 }), (3, Payload { timestamp: 1, data: 2 }), ])); @@ -347,23 +326,12 @@ async fn backwards_in_time_test() { simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 1, data: 7 }), (3, Payload { timestamp: 1, data: 7 }), ])); @@ -383,23 +351,12 @@ async fn multiple_input_sources_test() { simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap(); }; - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() - }; + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (1, Payload { timestamp: 2, data: 2 }), (2, Payload { timestamp: 2, data: 7 }), (3, Payload { timestamp: 2, data: 9 }), @@ -411,46 +368,24 @@ async fn multiple_input_sources_test() { async fn simple_operation_test() { let neighbors: Vec = vec![1, 2, 3]; - let (operations_tx, operations_rx) = unbounded_channel::>(); + let (mut operations_tx, operations_rx) = unbounded_channel::>(); let (mut input_send, input_recv) = unbounded_channel::>(); let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); #[rustfmt::skip] - simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + { + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap(); - operations_tx - .send(Ok(BytesMut::from( - serde_json::to_string(&OperationPayload { change: 5 }) - .unwrap() - .as_str(), - ))) - .unwrap(); - - operations_tx - .send(Ok(BytesMut::from( - serde_json::to_string(&OperationPayload { change: 7 }) - .unwrap() - .as_str(), - ))) - .unwrap(); - - let mut flow: Hydroflow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - let receive_all_output = || async move { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + flow.run_tick(); #[rustfmt::skip] - assert_eq!(receive_all_output().await, HashMultiSet::from_iter([ + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ (2, Payload { timestamp: 3, data: 14 }), (3, Payload { timestamp: 3, data: 14 }), ])); diff --git a/topolotree/src/util.rs b/topolotree/src/util.rs index 4eb53f48d5c9..a15cac270188 100644 --- a/topolotree/src/util.rs +++ b/topolotree/src/util.rs @@ -1,11 +1,14 @@ use std::fmt::Debug; -use hydroflow::bytes::BytesMut; +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::util::collect_ready_async; +use hydroflow::util::multiset::HashMultiSet; use serde::Serialize; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; -use crate::Payload; +use crate::{OperationPayload, Payload}; pub fn simulate_input( input_send: &mut UnboundedSender>, @@ -16,3 +19,27 @@ pub fn simulate_input( BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), ))) } + +pub fn simulate_operation( + input_send: &mut UnboundedSender>, + payload: OperationPayload, +) -> Result<(), SendError>> { + input_send.send(Ok(BytesMut::from( + serde_json::to_string(&payload).unwrap().as_str(), + ))) +} + +pub async fn read_all( + mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>, +) -> HashMultiSet<(u32, Payload)> { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) + .collect::>() +} From 6468c840093c199c4417135f8c65046f311d95fc Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:22:50 -0700 Subject: [PATCH 14/14] refactoring --- topolotree/src/main.rs | 163 +--------------------------------- topolotree/src/tests.rs | 191 ++++++++++++++++++++++++++++++++++++++++ topolotree/src/util.rs | 45 ---------- 3 files changed, 194 insertions(+), 205 deletions(-) create mode 100644 topolotree/src/tests.rs delete mode 100644 topolotree/src/util.rs diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index fd62a4f4a76c..173492098689 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -1,26 +1,19 @@ -mod util; +#[cfg(test)] +mod tests; use std::cell::RefCell; -use std::collections::HashSet; use std::fmt::Debug; use std::io; use std::rc::Rc; -use futures::{Sink, SinkExt, Stream}; +use futures::{SinkExt, Stream}; use hydroflow::bytes::{Bytes, BytesMut}; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; -use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; use hydroflow::util::cli::{ ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, }; -use hydroflow::util::multiset::HashMultiSet; -use hydroflow::util::{collect_ready_async, unbounded_channel}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::UnboundedSender; - -use crate::util::{read_all, simulate_input, simulate_operation}; #[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash)] pub struct Payload { @@ -99,7 +92,6 @@ fn run_topolotree( // message comes in from 2 // (2+3+SelfState) -> 1, (1+2+SelfState) -> 3 - from_neighbors // 2 comes out here -> map(|(src, _payload)| src) -> [0]all_other_neighbors_except_for_who_it_came_from; // 2 goes into this crossjoin @@ -132,8 +124,6 @@ fn run_topolotree( // (1, (2, 3))), (2, (2, 3))), -> [1]all_neighbor_data; - - all_neighbor_data = join() -> map(|(aggregate_from_this_guy, (payload, (original_src, target_neighbor)))| { ((target_neighbor, original_src), (aggregate_from_this_guy, payload)) @@ -158,8 +148,6 @@ fn run_topolotree( output_send.send((target_neighbor, serialized)).unwrap(); }); - - // src // -> map(|(from, _data)| from) // -> enumerate() @@ -264,148 +252,3 @@ async fn main() { )) .await; } - -#[hydroflow::test] -async fn simple_payload_test() { - let neighbors: Vec = vec![1, 2, 3]; - - let (_operations_tx, operations_rx) = unbounded_channel::>(); - let (mut input_send, input_recv) = unbounded_channel::>(); - let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - - #[rustfmt::skip] - simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); - - let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - - flow.run_tick(); - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (2, Payload { timestamp: 1, data: 2 }), - (3, Payload { timestamp: 1, data: 2 }), - ])); -} - -#[hydroflow::test] -async fn idempotence_test() { - let neighbors: Vec = vec![1, 2, 3]; - let (_operations_tx, operations_rx) = unbounded_channel::>(); - - let (mut input_send, input_recv) = unbounded_channel::>(); - let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); - simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); - }; - - let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - - flow.run_tick(); - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (2, Payload { timestamp: 1, data: 2 }), - (3, Payload { timestamp: 1, data: 2 }), - ])); -} - -#[hydroflow::test] -async fn backwards_in_time_test() { - let neighbors: Vec = vec![1, 2, 3]; - - let (_operations_tx, operations_rx) = unbounded_channel::>(); - let (mut input_send, input_recv) = unbounded_channel::>(); - let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); - simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); - }; - - let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - - flow.run_tick(); - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (2, Payload { timestamp: 1, data: 7 }), - (3, Payload { timestamp: 1, data: 7 }), - ])); -} - -#[hydroflow::test] -async fn multiple_input_sources_test() { - let neighbors: Vec = vec![1, 2, 3]; - let (_operations_tx, operations_rx) = unbounded_channel::>(); - - let (mut input_send, input_recv) = unbounded_channel::>(); - let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); - simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap(); - }; - - let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - - flow.run_tick(); - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (1, Payload { timestamp: 2, data: 2 }), - (2, Payload { timestamp: 2, data: 7 }), - (3, Payload { timestamp: 2, data: 9 }), - (3, Payload { timestamp: 2, data: 9 }), - ])); -} - -#[hydroflow::test] -async fn simple_operation_test() { - let neighbors: Vec = vec![1, 2, 3]; - - let (mut operations_tx, operations_rx) = unbounded_channel::>(); - let (mut input_send, input_recv) = unbounded_channel::>(); - let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); - simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap(); - simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap(); - - }; - - let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); - - flow.run_tick(); - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (2, Payload { timestamp: 3, data: 14 }), - (3, Payload { timestamp: 3, data: 14 }), - ])); -} - -// idempotence test (issue two requests with the same timestamp and see that they don't change anything.) -// let input1 = (1, Payload {timestamp:4, data:2}); -// let input2 = (1, Payload {timestamp:4, data:2}); -// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); -// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); -// -// backward in time test (issue two requests, the second one with an earlier timestamp than the first. ) -// let input1 = (1, Payload {timestamp:5, data:7}); -// let input2 = (1, Payload {timestamp:4, data:2}); -// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7}); -// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7}); -// -// updates from multiple sources test -// let input1 = (1, Payload {timestamp:5, data:7}); -// let input2 = (2, Payload {timestamp:4, data:2}); -// let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2}); -// let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7}); -// let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9}); diff --git a/topolotree/src/tests.rs b/topolotree/src/tests.rs new file mode 100644 index 000000000000..568c10b6ce71 --- /dev/null +++ b/topolotree/src/tests.rs @@ -0,0 +1,191 @@ +use std::fmt::Debug; +use std::io; + +use hydroflow::bytes::{Bytes, BytesMut}; +use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::util::multiset::HashMultiSet; +use hydroflow::util::{collect_ready_async, unbounded_channel}; +use serde::Serialize; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedSender; + +use crate::{run_topolotree, OperationPayload, Payload}; + +pub fn simulate_input( + input_send: &mut UnboundedSender>, + (id, payload): (u32, Payload), +) -> Result<(), SendError>> { + input_send.send(Ok(( + id, + BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), + ))) +} + +pub fn simulate_operation( + input_send: &mut UnboundedSender>, + payload: OperationPayload, +) -> Result<(), SendError>> { + input_send.send(Ok(BytesMut::from( + serde_json::to_string(&payload).unwrap().as_str(), + ))) +} + +pub async fn read_all( + mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>, +) -> HashMultiSet<(u32, Payload)> { + let collected = collect_ready_async::, _>(&mut output_recv).await; + collected + .iter() + .map(|(id, bytes)| { + ( + *id, + serde_json::from_slice::>(&bytes[..]).unwrap(), + ) + }) + .collect::>() +} + +#[hydroflow::test] +async fn simple_payload_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 2 }), + (3, Payload { timestamp: 1, data: 2 }), + ])); +} + +#[hydroflow::test] +async fn idempotence_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (_operations_tx, operations_rx) = unbounded_channel::>(); + + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 2 }), + (3, Payload { timestamp: 1, data: 2 }), + ])); +} + +#[hydroflow::test] +async fn backwards_in_time_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (_operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); + simulate_input(&mut input_send, (1, Payload { timestamp: 4, data: 2 })).unwrap(); + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 1, data: 7 }), + (3, Payload { timestamp: 1, data: 7 }), + ])); +} + +#[hydroflow::test] +async fn multiple_input_sources_test() { + let neighbors: Vec = vec![1, 2, 3]; + let (_operations_tx, operations_rx) = unbounded_channel::>(); + + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 5, data: 7 })).unwrap(); + simulate_input(&mut input_send, (2, Payload { timestamp: 4, data: 2 })).unwrap(); + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (1, Payload { timestamp: 2, data: 2 }), + (2, Payload { timestamp: 2, data: 7 }), + (3, Payload { timestamp: 2, data: 9 }), + (3, Payload { timestamp: 2, data: 9 }), + ])); +} + +#[hydroflow::test] +async fn simple_operation_test() { + let neighbors: Vec = vec![1, 2, 3]; + + let (mut operations_tx, operations_rx) = unbounded_channel::>(); + let (mut input_send, input_recv) = unbounded_channel::>(); + let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); + + #[rustfmt::skip] + { + simulate_input(&mut input_send, (1, Payload { timestamp: 1, data: 2 })).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 5 }).unwrap(); + simulate_operation(&mut operations_tx, OperationPayload { change: 7 }).unwrap(); + + }; + + let mut flow = run_topolotree(neighbors, input_recv, operations_rx, output_send); + + flow.run_tick(); + + #[rustfmt::skip] + assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ + (2, Payload { timestamp: 3, data: 14 }), + (3, Payload { timestamp: 3, data: 14 }), + ])); +} + +// idempotence test (issue two requests with the same timestamp and see that they don't change anything.) +// let input1 = (1, Payload {timestamp:4, data:2}); +// let input2 = (1, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:2}); +// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:2}); +// +// backward in time test (issue two requests, the second one with an earlier timestamp than the first. ) +// let input1 = (1, Payload {timestamp:5, data:7}); +// let input2 = (1, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (2, Payload {timestamp:1, data:7}); +// let output2: (u32, Payload) = (3, Payload {timestamp:1, data:7}); +// +// updates from multiple sources test +// let input1 = (1, Payload {timestamp:5, data:7}); +// let input2 = (2, Payload {timestamp:4, data:2}); +// let output1: (u32, Payload) = (1, Payload {timestamp:2, data:2}); +// let output2: (u32, Payload) = (2, Payload {timestamp:2, data:7}); +// let output3: (u32, Payload) = (3, Payload {timestamp:2, data:9}); diff --git a/topolotree/src/util.rs b/topolotree/src/util.rs deleted file mode 100644 index a15cac270188..000000000000 --- a/topolotree/src/util.rs +++ /dev/null @@ -1,45 +0,0 @@ -use std::fmt::Debug; - -use hydroflow::bytes::{Bytes, BytesMut}; -use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; -use hydroflow::util::collect_ready_async; -use hydroflow::util::multiset::HashMultiSet; -use serde::Serialize; -use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::UnboundedSender; - -use crate::{OperationPayload, Payload}; - -pub fn simulate_input( - input_send: &mut UnboundedSender>, - (id, payload): (u32, Payload), -) -> Result<(), SendError>> { - input_send.send(Ok(( - id, - BytesMut::from(serde_json::to_string(&payload).unwrap().as_str()), - ))) -} - -pub fn simulate_operation( - input_send: &mut UnboundedSender>, - payload: OperationPayload, -) -> Result<(), SendError>> { - input_send.send(Ok(BytesMut::from( - serde_json::to_string(&payload).unwrap().as_str(), - ))) -} - -pub async fn read_all( - mut output_recv: &mut UnboundedReceiverStream<(u32, Bytes)>, -) -> HashMultiSet<(u32, Payload)> { - let collected = collect_ready_async::, _>(&mut output_recv).await; - collected - .iter() - .map(|(id, bytes)| { - ( - *id, - serde_json::from_slice::>(&bytes[..]).unwrap(), - ) - }) - .collect::>() -}