diff --git a/runners/s3-benchrunner-rust/Cargo.lock b/runners/s3-benchrunner-rust/Cargo.lock index 6b2cbb5b..8f5770d4 100644 --- a/runners/s3-benchrunner-rust/Cargo.lock +++ b/runners/s3-benchrunner-rust/Cargo.lock @@ -122,6 +122,28 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "async-trait" version = "0.1.82" @@ -210,7 +232,7 @@ dependencies = [ [[package]] name = "aws-s3-transfer-manager" version = "0.1.0" -source = "git+ssh://git@github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=d2e1e164de35b3cdc34193a2a1721a63f959752f#d2e1e164de35b3cdc34193a2a1721a63f959752f" +source = "git+ssh://git@github.com/awslabs/aws-s3-transfer-manager-rs.git?rev=e929fac22a4aeb49b4ef1953324e8fcd54822aff#e929fac22a4aeb49b4ef1953324e8fcd54822aff" dependencies = [ "async-channel", "async-trait", @@ -225,7 +247,7 @@ dependencies = [ "path-clean", "pin-project-lite", "tokio", - "tower", + "tower 0.5.1", "tracing", ] @@ -482,7 +504,7 @@ dependencies = [ "httparse", "hyper", "hyper-rustls", - "indexmap", + "indexmap 2.5.0", "once_cell", "pin-project-lite", "pin-utils", @@ -560,6 +582,51 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -603,6 +670,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.6.0" @@ -627,6 +700,18 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.7.1" @@ -984,21 +1069,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - [[package]] name = "futures-channel" version = "0.3.30" @@ -1006,7 +1076,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", - "futures-sink", ] [[package]] @@ -1026,12 +1095,6 @@ dependencies = [ "futures-util", ] -[[package]] -name = "futures-io" -version = "0.3.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" - [[package]] name = "futures-macro" version = "0.3.30" @@ -1061,13 +1124,10 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ - "futures-channel", "futures-core", - "futures-io", "futures-macro", "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1100,6 +1160,12 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "group" version = "0.12.1" @@ -1123,7 +1189,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.5.0", "slab", "tokio", "tokio-util", @@ -1146,6 +1212,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -1291,6 +1363,18 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "idna" version = "0.5.0" @@ -1301,6 +1385,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.5.0" @@ -1308,7 +1402,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.5", "serde", ] @@ -1318,12 +1412,30 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "js-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1348,7 +1460,7 @@ version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" dependencies = [ - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -1360,6 +1472,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1376,6 +1494,12 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1489,6 +1613,87 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a94c69209c05319cdf7460c6d4c055ed102be242a0a6245835d7bc42c6ec7f54" +dependencies = [ + "async-trait", + "futures-core", + "http 0.2.12", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1869fb4bb9b35c5ba8a1e40c9b128a7b4c010d07091e864a29da19e4fe2ca4d7" + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "lazy_static", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "ordered-float" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" +dependencies = [ + "num-traits", +] + [[package]] name = "outref" version = "0.5.1" @@ -1530,6 +1735,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -1558,6 +1783,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + [[package]] name = "pretty_assertions" version = "1.4.1" @@ -1601,6 +1835,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "quote" version = "1.0.37" @@ -1610,6 +1867,27 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + [[package]] name = "rand_core" version = "0.6.4" @@ -1762,6 +2040,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" + [[package]] name = "ryu" version = "1.0.18" @@ -1780,11 +2064,17 @@ dependencies = [ "bytes", "clap", "fastrand", - "futures", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "serde", "serde_json", "thiserror", "tokio", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] @@ -1826,7 +2116,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.6.0", "core-foundation", "core-foundation-sys", "libc", @@ -1891,7 +2181,7 @@ version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ - "indexmap", + "indexmap 2.5.0", "itoa", "memchr", "ryu", @@ -2127,6 +2417,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.4.0" @@ -2148,6 +2448,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.12" @@ -2161,6 +2472,53 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.1" @@ -2233,6 +2591,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -2365,6 +2741,71 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.77", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2487,6 +2928,7 @@ version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ + "byteorder", "zerocopy-derive", ] diff --git a/runners/s3-benchrunner-rust/Cargo.toml b/runners/s3-benchrunner-rust/Cargo.toml index 752e6e76..3f316d86 100644 --- a/runners/s3-benchrunner-rust/Cargo.toml +++ b/runners/s3-benchrunner-rust/Cargo.toml @@ -4,16 +4,29 @@ version = "0.1.0" edition = "2021" [dependencies] + +# Swap which line is commented-out to use GitHub or local aws-s3-transfer-manager +aws-s3-transfer-manager = { git = "ssh://git@github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "e929fac22a4aeb49b4ef1953324e8fcd54822aff" } +# aws-s3-transfer-manager = { path = "../../../aws-s3-transfer-manager-rs/aws-s3-transfer-manager" } + +# tracing-opentelemetry 0.26.0 is a bit broken (see https://github.com/tokio-rs/tracing-opentelemetry/issues/159) +# so use 0.24.0 and the exact opentelemetry-* versions it depends on. +tracing-opentelemetry = "0.24.0" +opentelemetry = { version = "0.23", features = ["trace", "metrics"] } +opentelemetry_sdk = { version = "0.23", default-features = false, features = ["trace", "rt-tokio"] } +opentelemetry-otlp = { version = "0.16", features = ["metrics"] } +opentelemetry-semantic-conventions = "0.15.0" + anyhow = "1.0.86" async-trait = "0.1.81" aws-config = "1.5.4" -aws-s3-transfer-manager = { git = "ssh://git@github.com/awslabs/aws-s3-transfer-manager-rs.git", rev = "d2e1e164de35b3cdc34193a2a1721a63f959752f" } aws-sdk-s3 = "1.41.0" +bytes = "1" clap = { version = "4.5.9", features = ["derive"] } -futures = "0.3.30" +fastrand = "=2.1.0" serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.120" thiserror = "1.0.62" tokio = { version = "1.38.1", features = ["io-util"] } -bytes = "1" -fastrand = "=2.1.0" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" diff --git a/runners/s3-benchrunner-rust/README.md b/runners/s3-benchrunner-rust/README.md index 4f0f443b..3c81bb68 100644 --- a/runners/s3-benchrunner-rust/README.md +++ b/runners/s3-benchrunner-rust/README.md @@ -14,7 +14,7 @@ This produces: `target/release/s3-benchrunner-rust` ## Running ``` -Usage: s3-benchrunner-rust +Usage: s3-benchrunner-rust [OPTIONS] Arguments: @@ -36,8 +36,19 @@ Arguments: Target throughput, in gigabits per second (e.g. "100.0" for c5n.18xlarge) Options: + --telemetry + Emit telemetry via OTLP/gRPC to http://localhost:4317 + -h, --help Print help (see a summary with '-h') ``` See further instructions [here](../../README.md#run-a-benchmark). + +### Viewing Telemetry + +Use the `--telemetry` flag to export OpenTelemetry data to http://localhost:4317 as OTLP/gRPC payloads. + +The simplest way I know collect and view this data is with [Jaeger All in One](https://www.jaegertracing.io/docs/latest/getting-started/) or [otel-desktop-viewer](https://github.com/CtrlSpice/otel-desktop-viewer?tab=readme-ov-file#getting-started). Get one of these running, run the benchmark with the `--telemetry` flag, then view the data in your browser. + +TODO: document how to collect and view data from a non-local run. diff --git a/runners/s3-benchrunner-rust/src/lib.rs b/runners/s3-benchrunner-rust/src/lib.rs index d28e3a6b..37115ffd 100644 --- a/runners/s3-benchrunner-rust/src/lib.rs +++ b/runners/s3-benchrunner-rust/src/lib.rs @@ -3,6 +3,8 @@ use async_trait::async_trait; use serde::Deserialize; use std::{fs::File, io::BufReader, path::Path}; +pub mod telemetry; + mod transfer_manager; pub use transfer_manager::TransferManagerRunner; diff --git a/runners/s3-benchrunner-rust/src/main.rs b/runners/s3-benchrunner-rust/src/main.rs index e790c01e..b66708ee 100644 --- a/runners/s3-benchrunner-rust/src/main.rs +++ b/runners/s3-benchrunner-rust/src/main.rs @@ -1,13 +1,13 @@ use clap::{Parser, ValueEnum}; use std::process::exit; use std::time::Instant; +use tracing::{self, info_span, instrument, Instrument}; use s3_benchrunner_rust::{ - bytes_to_gigabits, prepare_run, BenchmarkConfig, Result, RunBenchmark, SkipBenchmarkError, - TransferManagerRunner, + bytes_to_gigabits, prepare_run, telemetry, BenchmarkConfig, Result, RunBenchmark, + SkipBenchmarkError, TransferManagerRunner, }; - -#[derive(Parser)] +#[derive(Parser, Debug)] #[command()] struct Args { #[arg(value_enum, help = "ID of S3 library to use")] @@ -20,9 +20,11 @@ struct Args { region: String, #[arg(help = "Target throughput, in gigabits per second (e.g. \"100.0\" for c5n.18xlarge)")] target_throughput: f64, + #[arg(long, help = "Emit telemetry via OTLP/gRPC to http://localhost:4317")] + telemetry: bool, } -#[derive(ValueEnum, Clone)] +#[derive(ValueEnum, Clone, Debug)] enum S3ClientId { #[clap(name = "sdk-rust-tm", help = "use aws-s3-transfer-manager crate")] TransferManager, @@ -31,16 +33,23 @@ enum S3ClientId { // SdkClient, } -fn main() { +#[tokio::main] +async fn main() { let args = Args::parse(); - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); + let _telemetry_guard = if args.telemetry { + // If emitting telemetry, set that up as tracing_subscriber. + Some(telemetry::init_tracing_subscriber().unwrap()) + } else { + // Otherwise, set the default subscriber, + // which prints to stdout if env-var set like RUST_LOG=trace + tracing_subscriber::fmt::init(); + None + }; - match runtime.block_on(async_main(&args)) { - Err(e) => match e.downcast_ref::() { + let result = execute(&args).await; + if let Err(e) = result { + match e.downcast_ref::() { None => { panic!("{e:?}"); } @@ -48,26 +57,15 @@ fn main() { eprintln!("Skipping benchmark - {msg}"); exit(123); } - }, - Ok(()) => (), + } } } -async fn async_main(args: &Args) -> Result<()> { - let config = BenchmarkConfig::new( - &args.workload, - &args.bucket, - &args.region, - args.target_throughput, - )?; - +#[instrument(name = "main")] +async fn execute(args: &Args) -> Result<()> { // create appropriate benchmark runner - let runner: Box = match args.s3_client { - S3ClientId::TransferManager => { - let transfer_manager = TransferManagerRunner::new(config).await; - Box::new(transfer_manager) - } - }; + let runner = new_runner(args).await?; + let workload = &runner.config().workload; let bytes_per_run: u64 = workload.tasks.iter().map(|x| x.size).sum(); let gigabits_per_run = bytes_to_gigabits(bytes_per_run); @@ -79,7 +77,10 @@ async fn async_main(args: &Args) -> Result<()> { let run_start = Instant::now(); - runner.run().await?; + runner + .run() + .instrument(info_span!("run", i = run_i)) + .await?; let run_secs = run_start.elapsed().as_secs_f64(); println!( @@ -97,3 +98,19 @@ async fn async_main(args: &Args) -> Result<()> { Ok(()) } + +async fn new_runner(args: &Args) -> Result> { + let config = BenchmarkConfig::new( + &args.workload, + &args.bucket, + &args.region, + args.target_throughput, + )?; + + match args.s3_client { + S3ClientId::TransferManager => { + let transfer_manager = TransferManagerRunner::new(config).await; + Ok(Box::new(transfer_manager)) + } + } +} diff --git a/runners/s3-benchrunner-rust/src/telemetry.rs b/runners/s3-benchrunner-rust/src/telemetry.rs new file mode 100644 index 00000000..3dfb74e3 --- /dev/null +++ b/runners/s3-benchrunner-rust/src/telemetry.rs @@ -0,0 +1,87 @@ +//! code adapted from: https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.24.0/examples/opentelemetry-otlp.rs + +// Avoid adding `use` declarations to the top of this file. +// If you MUST shorten a path, add the `use` within a function. +// The examples this code is adapted from had `use` declarations, and +// I (graebm) found it hard to understand what all the boilerplate was doing. +// With full paths, it's clear that the boilerplate is about tying together +// different ecosystems (`opentelemetry` vs `tracing`). These ecosystems +// split their features among many crates, and full paths make it more clear. + +use anyhow::Context; + +use crate::Result; + +// Create OTEL Resource (the entity that produces telemetry) +fn otel_resource() -> opentelemetry_sdk::Resource { + use opentelemetry::KeyValue; + use opentelemetry_semantic_conventions::{ + resource::{DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, + }; + + opentelemetry_sdk::Resource::from_schema_url( + [ + KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")), + KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")), + KeyValue::new(DEPLOYMENT_ENVIRONMENT, "develop"), + ], + SCHEMA_URL, + ) +} + +// Construct OpenTelemetry Tracer +fn new_otel_tracer() -> Result { + use opentelemetry_sdk::trace::Sampler; + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config( + opentelemetry_sdk::trace::Config::default() + // Customize sampling strategy + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + // If export trace to AWS X-Ray, you can use XrayIdGenerator + .with_id_generator(opentelemetry_sdk::trace::RandomIdGenerator::default()) + .with_resource(otel_resource()), + ) + .with_batch_config(opentelemetry_sdk::trace::BatchConfig::default()) + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .with_context(|| format!("")) +} + +/// TelemetryGuard ensures data gets flushed when the guard goes out of scope. +pub struct TelemetryGuard {} + +impl Drop for TelemetryGuard { + fn drop(&mut self) { + opentelemetry::global::shutdown_tracer_provider(); + } +} + +pub fn init_tracing_subscriber() -> Result { + let otel_tracer = new_otel_tracer()?; + + // We want data emitted from the `tracing` crate to be exported as OpenTelemetry data. + // To do this, register an `OpenTelemetryLayer` as a `tracing_subscriber`. + // + // TODO: stop using `tracing_opentelemetry::OpenTelemetryLayer` (from makers of Tokio) + // when OpenTelemetry adds `tracing` integration in the OpenTelemetry SDK itself. + // - We've had issues where these crates don't all work together: + // https://github.com/tokio-rs/tracing-opentelemetry/issues/159 + // - OpenTelemetry says they're working on adding on their own integration: + // https://github.com/open-telemetry/opentelemetry-rust/issues/1571#issuecomment-2258910019) + + use tracing_subscriber::prelude::*; + + tracing_subscriber::registry() + .with(tracing_subscriber::filter::LevelFilter::from_level( + tracing::Level::INFO, + )) + .with(tracing_opentelemetry::OpenTelemetryLayer::new(otel_tracer)) + .init(); + + Ok(TelemetryGuard {}) +} diff --git a/runners/s3-benchrunner-rust/src/transfer_manager.rs b/runners/s3-benchrunner-rust/src/transfer_manager.rs index efff7b3e..649e4a2f 100644 --- a/runners/s3-benchrunner-rust/src/transfer_manager.rs +++ b/runners/s3-benchrunner-rust/src/transfer_manager.rs @@ -7,10 +7,11 @@ use aws_s3_transfer_manager::{ io::InputStream, types::{ConcurrencySetting, PartSize}, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; +use tracing::{info_span, Instrument}; use crate::{ BenchmarkConfig, Result, RunBenchmark, SkipBenchmarkError, TaskAction, TaskConfig, PART_SIZE, @@ -82,7 +83,7 @@ impl TransferManagerRunner { } } - async fn run_task(self, task_i: usize) -> Result<()> { + async fn run_task(self, task_i: usize, parent_span: tracing::Span) -> Result<()> { let task_config = &self.config().workload.tasks[task_i]; if self.config().workload.checksum.is_some() { @@ -90,8 +91,16 @@ impl TransferManagerRunner { } match task_config.action { - TaskAction::Download => self.download(task_config).await, - TaskAction::Upload => self.upload(task_config).await, + TaskAction::Download => { + self.download(task_config) + .instrument(info_span!(parent: parent_span, "download", key=task_config.key)) + .await + } + TaskAction::Upload => { + self.upload(task_config) + .instrument(info_span!(parent: parent_span, "upload", key=task_config.key)) + .await + } } } @@ -105,12 +114,14 @@ impl TransferManagerRunner { .bucket(&self.config().bucket) .key(key) .send() + .instrument(info_span!("initial-send")) .await .with_context(|| format!("failed starting download: {key}"))?; // if files_on_disk: open file for writing let mut dest_file = if self.config().workload.files_on_disk { let file = File::create(key) + .instrument(info_span!("file-open")) .await .with_context(|| format!("failed creating file: {key}"))?; Some(file) @@ -119,20 +130,23 @@ impl TransferManagerRunner { }; let mut total_size = 0u64; - while let Some(chunk_result) = download_handle.body_mut().next().await { - let chunk = + while let Some(chunk_result) = download_handle + .body_mut() + .next() + .instrument(info_span!("body-next")) + .await + { + let mut chunk = chunk_result.with_context(|| format!("failed downloading next chunk of: {key}"))?; - for segment in chunk.into_segments() { - // if files_on_disk: write to file - if let Some(dest_file) = &mut dest_file { - dest_file - .write_all(&segment) - .await - .with_context(|| format!("failed writing file: {key}"))?; - } + let chunk_size = chunk.remaining(); + total_size += chunk_size as u64; - total_size += segment.len() as u64; + if let Some(dest_file) = &mut dest_file { + dest_file + .write_all_buf(&mut chunk) + .instrument(info_span!("file-write", bytes = chunk_size)) + .await?; } } @@ -153,16 +167,21 @@ impl TransferManagerRunner { .into() }; - self.handle + let upload_handle = self + .handle .transfer_manager .upload() .bucket(&self.config().bucket) .key(key) .body(stream) .send() + .instrument(info_span!("initial-send")) .await - .with_context(|| format!("failed starting upload: {key}"))? + .with_context(|| format!("failed starting upload: {key}"))?; + + upload_handle .join() + .instrument(info_span!("join")) .await .with_context(|| format!("failed uploading: {key}"))?; @@ -178,7 +197,9 @@ impl RunBenchmark for TransferManagerRunner { // so we're using a JoinSet. let mut task_set: JoinSet> = JoinSet::new(); for i in 0..self.config().workload.tasks.len() { - task_set.spawn(self.clone().run_task(i)); + let parent_span_of_task = tracing::Span::current(); + let task = self.clone().run_task(i, parent_span_of_task); + task_set.spawn(task); } while let Some(join_result) = task_set.join_next().await {