Skip to content

Commit

Permalink
Merge branch 'tokio-rs:main' into blocking-warning
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjscho authored Apr 10, 2024
2 parents 748c17a + 6728fbe commit e4928bd
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 129 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
# 0.6.2 (April 3, 2024)

### Added

- Enable tokio io driver ([#171])

### Fixed

- Make sim_elapsed function safe to call outside of simulation ([#170])

[#170]: https://github.com/tokio-rs/turmoil/pull/170
[#171]: https://github.com/tokio-rs/turmoil/pull/171

# 0.6.1 (January 10, 2024)

### Added

- Documentation on using tracing with Turmoil ([#155] [#164])
- Add a check for port exhaustion ([#157])
- Add `turmoil::sim_elapsed`` for retrieving total simulation virtual time ([#164])
- Update Axum example to use axum 0.7 and hyper 1 ([#154])

[#154]: https://github.com/tokio-rs/turmoil/pull/154
[#155]: https://github.com/tokio-rs/turmoil/pull/155
[#157]: https://github.com/tokio-rs/turmoil/pull/157
[#164]: https://github.com/tokio-rs/turmoil/pull/164

# 0.6.0 (November 17, 2023)

### Added
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name = "turmoil"
# - README.md
# - Update CHANGELOG.md
# - Create git tag
version = "0.6.0"
version = "0.6.2"
edition = "2021"
license = "MIT"
authors = ["Tokio Contributors <[email protected]>"]
Expand Down
8 changes: 5 additions & 3 deletions examples/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ edition = "2021"
publish = false

[dependencies]
axum = "0.6"
hyper = { version = "0.14", features = ["full"] }
async-stream = "0.3"
axum = "0.7"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1.2", features = ["full"] }
http-body-util = "0.1"
turmoil = { path = "../.." }
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1"
tower = "0.4"
pin-project-lite = "0.2"
114 changes: 67 additions & 47 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use axum::extract::Path;
use axum::response::Response;
use axum::routing::get;
use axum::Router;
use axum::{body::Body, http::Request};
use hyper::server::accept::from_stream;
use hyper::{Client, Server, Uri};
use axum::{body::Body, extract::Path, http::Request, routing::get, Router};
use http_body_util::BodyExt as _;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use std::net::{IpAddr, Ipv4Addr};
use tower::make::Shared;
use tracing::{info_span, Instrument};
use turmoil::{net, Builder};

Expand All @@ -29,15 +24,27 @@ fn main() {
sim.host("server", move || {
let router = router.clone();
async move {
Server::builder(from_stream(async_stream::stream! {
let listener = net::TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| s);
let listener = net::TcpListener::bind(addr).await?;
loop {
let (tcp_stream, _remote_addr) = listener.accept().await?;
let tcp_stream = hyper_util::rt::TokioIo::new(tcp_stream);

let hyper_service = hyper_util::service::TowerToHyperService::new(router.clone());

let result = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.serve_connection_with_upgrades(tcp_stream, hyper_service)
.await;
if result.is_err() {
// This error only appears when the client doesn't send a request and
// terminate the connection.
//
// If client sends one request then terminate connection whenever, it doesn't
// appear.
break;
}
}))
.serve(Shared::new(router))
.await
.unwrap();
}

Ok(())
}
Expand All @@ -47,15 +54,15 @@ fn main() {
sim.client(
"client",
async move {
let client = Client::builder().build(connector::connector());
let client = Client::builder(TokioExecutor::new()).build(connector::connector());

let mut request = Request::new(Body::empty());
*request.uri_mut() = Uri::from_static("http://server:9999/greet/foo");
*request.uri_mut() = hyper::Uri::from_static("http://server:9999/greet/foo");
let res = client.request(request).await?;

let (parts, body) = res.into_parts();
let body = hyper::body::to_bytes(body).await?;
let res = Response::from_parts(parts, body);
let body = body.collect().await?.to_bytes();
let res = hyper::Response::from_parts(parts, body);

tracing::info!("Got response: {:?}", res);

Expand All @@ -68,68 +75,81 @@ fn main() {
}

mod connector {
use std::{future::Future, pin::Pin};

use hyper::{
client::connect::{Connected, Connection},
Uri,
};
use tokio::io::{AsyncRead, AsyncWrite};
use hyper::Uri;
use pin_project_lite::pin_project;
use std::{future::Future, io::Error, pin::Pin};
use tokio::io::AsyncWrite;
use tower::Service;
use turmoil::net::TcpStream;

type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, std::io::Error>> + Send>>;
type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, Error>> + Send>>;

pub fn connector(
) -> impl Service<Uri, Response = TurmoilConnection, Error = std::io::Error, Future = Fut> + Clone
{
) -> impl Service<Uri, Response = TurmoilConnection, Error = Error, Future = Fut> + Clone {
tower::service_fn(|uri: Uri| {
Box::pin(async move {
let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok::<_, std::io::Error>(TurmoilConnection(conn))
Ok::<_, Error>(TurmoilConnection { fut: conn })
}) as Fut
})
}

pub struct TurmoilConnection(turmoil::net::TcpStream);
pin_project! {
pub struct TurmoilConnection{
#[pin]
fut: turmoil::net::TcpStream
}
}

impl AsyncRead for TurmoilConnection {
impl hyper::rt::Read for TurmoilConnection {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> std::task::Poll<Result<(), Error>> {
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
let result = tokio::io::AsyncRead::poll_read(self.project().fut, cx, &mut tbuf);
match result {
std::task::Poll::Ready(Ok(())) => tbuf.filled().len(),
other => return other,
}
};

unsafe {
buf.advance(n);
}
std::task::Poll::Ready(Ok(()))
}
}

impl AsyncWrite for TurmoilConnection {
impl hyper::rt::Write for TurmoilConnection {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
) -> std::task::Poll<Result<usize, Error>> {
Pin::new(&mut self.fut).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
) -> std::task::Poll<Result<(), Error>> {
Pin::new(&mut self.fut).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
) -> std::task::Poll<Result<(), Error>> {
Pin::new(&mut self.fut).poll_shutdown(cx)
}
}

impl Connection for TurmoilConnection {
fn connected(&self) -> hyper::client::connect::Connected {
Connected::new()
impl hyper_util::client::legacy::connect::Connection for TurmoilConnection {
fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
hyper_util::client::legacy::connect::Connected::new()
}
}
}
36 changes: 30 additions & 6 deletions examples/grpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ use proto::{HelloReply, HelloRequest};
use crate::proto::greeter_client::GreeterClient;

fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing_subscriber::fmt::init();
configure_tracing();

let addr = (IpAddr::from(Ipv4Addr::UNSPECIFIED), 9999);

Expand Down Expand Up @@ -60,7 +56,7 @@ fn main() {
let request = Request::new(HelloRequest { name: "foo".into() });
let res = greeter_client.say_hello(request).await?;

tracing::info!("Got response: {:?}", res);
tracing::info!(?res, "Got response");

Ok(())
}
Expand All @@ -70,6 +66,33 @@ fn main() {
sim.run().unwrap();
}

/// An example of how to configure a tracing subscriber that will log logical
/// elapsed time since the simulation started using `turmoil::sim_elapsed()`.
fn configure_tracing() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing::subscriber::set_global_default(
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_timer(SimElapsedTime)
.finish(),
)
.expect("Configure tracing");
}

#[derive(Clone)]
struct SimElapsedTime;
impl tracing_subscriber::fmt::time::FormatTime for SimElapsedTime {
fn format_time(&self, w: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result {
// Prints real time and sim elapsed time. Example: 2024-01-10T17:06:57.020452Z [76ms]
tracing_subscriber::fmt::time()
.format_time(w)
.and_then(|()| write!(w, " [{:?}]", turmoil::sim_elapsed().unwrap_or_default()))
}
}

#[derive(Default)]
pub struct MyGreeter {}

Expand All @@ -79,6 +102,7 @@ impl Greeter for MyGreeter {
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
tracing::info!(?request, "Got request");
let reply = HelloReply {
message: format!("Hello {}!", request.into_inner().name),
};
Expand Down
14 changes: 9 additions & 5 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::*;
use std::{ops::RangeInclusive, time::SystemTime};

use rand::{RngCore, SeedableRng};
use std::{
ops::RangeInclusive,
time::{Duration, SystemTime},
};

use crate::*;

/// A builder that can be used to configure the simulation.
///
Expand Down Expand Up @@ -166,6 +164,12 @@ impl Builder {
self
}

/// Enables the tokio I/O driver.
pub fn enable_tokio_io(&mut self) -> &mut Self {
self.config.enable_tokio_io = true;
self
}

/// Build a simulation with the settings from the builder.
///
/// This will use default rng with entropy from the device running.
Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub(crate) struct Config {

/// Max size of the udp receive buffer
pub(crate) udp_capacity: usize,

/// Enables tokio IO driver
pub(crate) enable_tokio_io: bool,
}

/// Configures link behavior.
Expand Down Expand Up @@ -87,6 +90,7 @@ impl Default for Config {
ephemeral_ports: 49152..=65535,
tcp_capacity: 64,
udp_capacity: 64,
enable_tokio_io: false,
}
}
}
Expand Down
Loading

0 comments on commit e4928bd

Please sign in to comment.