diff --git a/Cargo.lock b/Cargo.lock index bc9706ac5626..096d09ee50c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,7 +392,7 @@ dependencies = [ "crates-io", "curl", "curl-sys", - "env_logger", + "env_logger 0.10.0", "filetime", "flate2", "fwdansi", @@ -488,6 +488,15 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cfg-expr" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a35b255461940a32985c627ce82900867c61db1659764d3675ea81963f72a4c6" +dependencies = [ + "smallvec", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -752,6 +761,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crdts" +version = "7.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e30e1170960eddf5392c448ff5b190a49cfcb28d6c79f789f2b4e30b571f8f8" +dependencies = [ + "num", + "quickcheck", + "serde", + "tiny-keccak", +] + [[package]] name = "criterion" version = "0.4.0" @@ -790,6 +811,20 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.7" @@ -824,6 +859,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.15" @@ -833,6 +878,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -981,6 +1032,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "dircpy" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10b6622b9d0dc20c70e74ff24c56493278d7d9299ac8729deb923703616e5a7e" +dependencies = [ + "jwalk", + "log", + "walkdir", +] + [[package]] name = "dirs" version = "3.0.2" @@ -1019,6 +1081,16 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -1147,6 +1219,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-batch" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f444c45a1cb86f2a7e301469fd50a82084a60dadc25d94529a8312276ecb71a" +dependencies = [ + "futures", + "futures-timer", + "pin-utils", +] + [[package]] name = "futures-channel" version = "0.3.27" @@ -1218,6 +1301,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd65540d33b37b16542a0438c12e6aeead10d4ac5d05bd3f805b8f35ab592879" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.27" @@ -1513,8 +1602,10 @@ dependencies = [ "clap 4.1.13", "colored", "core_affinity", + "crdts", "criterion", "futures", + "futures-batch", "getrandom 0.2.8", "hdrhistogram", "hydroflow_cli_integration", @@ -1526,16 +1617,19 @@ dependencies = [ "multiplatform_test", "pusherator", "rand 0.8.5", + "rand_distr", "ref-cast", "regex", "rustc-hash", "sealed", "serde", + "serde-big-array", "serde_json", "slotmap", "static_assertions", "textnonce", "time", + "tmq", "tokio", "tokio-stream", "tokio-util", @@ -1787,6 +1881,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jwalk" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dbcda57db8b6dc067e589628b7348639014e793d9e8137d8cf215e8b133a0bd" +dependencies = [ + "crossbeam", + "rayon", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -2038,6 +2142,42 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "num" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b7a8e9be5e039e2ff869df49155f1c06bd01ade2117ec783e56ab0932b67a8f" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6f7833f2cbf2360a6cfd58cd41a53aa7a90bd4c202f5b1c7dd2ed73c57b2c3" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", + "serde", +] + +[[package]] +name = "num-complex" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "747d632c0c558b87dbabbe6a82f3b4ae03720d0646ac5b7b4dae89394be5f2c5" +dependencies = [ + "num-traits", + "serde", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -2048,6 +2188,30 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -2491,6 +2655,18 @@ dependencies = [ "serde", ] +[[package]] +name = "quickcheck" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44883e74aa97ad63db83c4bf8ca490f02b2fc02f92575e720c8551e843c945f" +dependencies = [ + "env_logger 0.7.1", + "log", + "rand 0.7.3", + "rand_core 0.5.1", +] + [[package]] name = "quote" version = "1.0.26" @@ -2873,6 +3049,15 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-big-array" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11fc7cc2c76d73e0f27ee52abbd64eec84d46f370c88371120433196934e4b7f" +dependencies = [ + "serde", +] + [[package]] name = "serde-value" version = "0.7.0" @@ -2915,6 +3100,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0efd8caf556a6cebd3b285caf480045fcc1ac04f6bd786b09a6f11af30c4fcf4" +dependencies = [ + "serde", +] + [[package]] name = "sha1" version = "0.10.5" @@ -3097,6 +3291,19 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "system-deps" +version = "6.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "555fc8147af6256f3931a36bb83ad0023240ce9cf2b319dec8236fd1f220b05f" +dependencies = [ + "cfg-expr", + "heck 0.4.1", + "pkg-config", + "toml 0.7.3", + "version-compare", +] + [[package]] name = "tar" version = "0.4.38" @@ -3252,6 +3459,15 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9046af28827ac831479d245eb8afd9522599a3cbb22d6c42a82cb9e4ccdf858" +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tiny_http" version = "0.8.2" @@ -3290,6 +3506,19 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tmq" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e83994e39464b6dd4b3d3e1596744d6c1d56b6687fa4aa099c4994c31835763" +dependencies = [ + "futures", + "log", + "thiserror", + "tokio", + "zmq", +] + [[package]] name = "tokio" version = "1.26.0" @@ -3330,6 +3559,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -3356,6 +3586,18 @@ dependencies = [ "serde", ] +[[package]] +name = "toml" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b403acf6f2bb0859c93c7f0d967cb4a75a7ac552100f9322faf64dc047669b21" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime 0.6.1", + "toml_edit 0.19.8", +] + [[package]] name = "toml_datetime" version = "0.5.1" @@ -3370,6 +3612,9 @@ name = "toml_datetime" version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622" +dependencies = [ + "serde", +] [[package]] name = "toml_edit" @@ -3392,6 +3637,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "239410c8609e8125456927e6707163a3b1fdb40561e4b803bc041f466ccfdc13" dependencies = [ "indexmap", + "serde", + "serde_spanned", "toml_datetime 0.6.1", "winnow", ] @@ -3471,7 +3718,7 @@ dependencies = [ "serde_json", "smallbitvec", "tiny_http", - "toml", + "toml 0.5.11", "tree-sitter", "tree-sitter-config", "tree-sitter-highlight", @@ -3641,6 +3888,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "version-compare" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579a42fc0b8e0c63b76519a339be31bed574929511fa53c1a3acae26eb258f29" + [[package]] name = "version_check" version = "0.9.4" @@ -3975,6 +4228,16 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "zeromq-src" +version = "0.2.5+4.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53aaa8119f753d047dc5a9dbcc720bf720b466fdf859aaaae8638f3afc1a3564" +dependencies = [ + "cc", + "dircpy", +] + [[package]] name = "zipf" version = "7.0.0" @@ -3983,3 +4246,25 @@ checksum = "835688a7a1b5d2dfaeb5b7e1b4cfb979e7095a70cd1c72fe083f4904ef3e995e" dependencies = [ "rand 0.8.5", ] + +[[package]] +name = "zmq" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd3091dd571fb84a9b3e5e5c6a807d186c411c812c8618786c3c30e5349234e7" +dependencies = [ + "bitflags", + "libc", + "zmq-sys", +] + +[[package]] +name = "zmq-sys" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8351dc72494b4d7f5652a681c33634063bbad58046c1689e75270908fdc864" +dependencies = [ + "libc", + "system-deps", + "zeromq-src", +] diff --git a/hydro_cli/src/core/hydroflow_crate/ports.rs b/hydro_cli/src/core/hydroflow_crate/ports.rs index 3f492e47522c..8583a72e2c99 100644 --- a/hydro_cli/src/core/hydroflow_crate/ports.rs +++ b/hydro_cli/src/core/hydroflow_crate/ports.rs @@ -526,7 +526,6 @@ impl ServerConfig { } ServerConfig::MergeSelect(underlying, key) => { - dbg!(underlying); let key = *key; underlying .load_instantiated( diff --git a/hydro_cli/src/lib.rs b/hydro_cli/src/lib.rs index 4d98f4f4b949..3b199fe489d1 100644 --- a/hydro_cli/src/lib.rs +++ b/hydro_cli/src/lib.rs @@ -30,8 +30,9 @@ struct SafeCancelToken { impl SafeCancelToken { fn safe_cancel(&mut self) { if let Some(token) = self.cancel_tx.take() { - eprintln!("Received cancellation, cleaning up..."); - token.send(()).unwrap(); + if let Ok(_) = token.send(()) { + eprintln!("Received cancellation, cleaning up..."); + } } else { eprintln!("Already received cancellation, please be patient!"); } @@ -52,13 +53,16 @@ async def coroutine_to_safely_cancellable(c, cancel_token): while True: try: ok, cancel = await asyncio.shield(c) + is_done = True except asyncio.CancelledError: cancel_token.safe_cancel() + is_done = False - if not cancel: - return ok - else: - raise asyncio.CancelledError() + if is_done: + if not cancel: + return ok + else: + raise asyncio.CancelledError() "#, "coro_converter", "coro_converter", diff --git a/hydro_cli_examples/examples/dedalus_sender/main.rs b/hydro_cli_examples/examples/dedalus_sender/main.rs index ad7da8cf9088..69fb6edc7b03 100644 --- a/hydro_cli_examples/examples/dedalus_sender/main.rs +++ b/hydro_cli_examples/examples/dedalus_sender/main.rs @@ -1,8 +1,10 @@ +use std::time::Duration; + use hydroflow::{ tokio_stream::wrappers::IntervalStream, util::{ cli::{ConnectedBidi, ConnectedDemux, ConnectedSink}, - serialize_to_bytes, + serialize_to_bytes, batched_sink, }, }; use hydroflow_datalog::datalog; @@ -18,7 +20,7 @@ async fn main() { let (peers, sender_i): (Vec, u32) = serde_json::from_str(&std::env::args().nth(1).unwrap()).unwrap(); - let broadcast_sink = broadcast_port.into_sink(); + let broadcast_sink = batched_sink(broadcast_port.into_sink(), 8, Duration::from_millis(1)); let periodic = IntervalStream::new(tokio::time::interval(std::time::Duration::from_secs(1))); let to_repeat = vec![ diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index b00a920265ef..37771410ff29 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [features] default = [ "async", "macros" ] -async = [ "futures" ] +async = [ "futures", "futures-batch" ] macros = [ "hydroflow_macro", "hydroflow_datalog" ] hydroflow_macro = [ "dep:hydroflow_macro" ] hydroflow_datalog = [ "dep:hydroflow_datalog" ] @@ -52,6 +52,7 @@ bincode = "1.3" byteorder = "1.4.3" bytes = "1.1.0" futures = { version = "0.3", optional = true } +futures-batch = { version = "0.6.1", optional = true } hydroflow_datalog = { optional = true, path = "../hydroflow_datalog" } hydroflow_lang = { path = "../hydroflow_lang" } hydroflow_macro = { optional = true, path = "../hydroflow_macro" } diff --git a/hydroflow/src/util/mod.rs b/hydroflow/src/util/mod.rs index 45bf9b239349..02a5df947d3e 100644 --- a/hydroflow/src/util/mod.rs +++ b/hydroflow/src/util/mod.rs @@ -18,9 +18,10 @@ pub mod cli; use std::net::SocketAddr; use std::task::{Context, Poll}; +use std::time::Duration; use bincode; -use futures::Stream; +use futures::{Stream, Sink, SinkExt}; use serde::{Deserialize, Serialize}; pub fn unbounded_channel() -> ( @@ -131,6 +132,36 @@ where slice.sort_unstable_by(|a, b| f(a).cmp(f(b))) } +pub fn batched_sink + Send + 'static>(s: S, cap: usize, timeout: Duration) -> impl Sink + Unpin { + let (send, recv) = tokio::sync::mpsc::unbounded_channel::(); + + use futures::{stream, StreamExt}; + use futures_batch::ChunksTimeoutStreamExt; + + tokio::spawn(async move { + let recv_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(recv); + let mut batched_recv = recv_stream.chunks_timeout(cap, timeout); + let mut s = Box::pin(s); + + loop { + if let Some(batch) = batched_recv.next().await { + if let Err(_) = s.send_all(&mut stream::iter(batch).map(|v| Ok(v))) + .await { + panic!("Batched sink failed") + } + } else { + break; + } + } + }); + + Box::pin(futures::sink::unfold(send, |send, item| async move { + send.send(item) + .map(|_| send) + .map_err(|_| ()) + })) +} + #[cfg(test)] mod test { use super::*;