Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ idna_adapter = "=1.2.0"
lambda_runtime = { path = ".", features = ["tracing", "graceful-shutdown"] }
pin-project-lite = { workspace = true }
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["registry"] }

[package.metadata.docs.rs]
all-features = true
184 changes: 184 additions & 0 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,4 +908,188 @@ mod endpoint_tests {
server_handle.abort();
Ok(())
}

#[tokio::test]
#[cfg(feature = "experimental-concurrency")]
async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> {
use std::{
collections::{HashMap, HashSet},
sync::Mutex,
};
use tracing::{info, subscriber::set_global_default};
use tracing_subscriber::{layer::SubscriberExt, Layer};

#[derive(Clone)]
struct LogCapture {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for the handrolled layer rather than using https://docs.rs/tracing-test/latest/tracing_test/ ?

logs: Arc<Mutex<Vec<HashMap<String, String>>>>,
}

impl LogCapture {
fn new() -> Self {
Self {
logs: Arc::new(Mutex::new(Vec::new())),
}
}
}

impl<S> Layer<S> for LogCapture
where
S: tracing::Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
let mut fields = HashMap::new();
struct FieldVisitor<'a>(&'a mut HashMap<String, String>);
impl<'a> tracing::field::Visit for FieldVisitor<'a> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.0.insert(
field.name().to_string(),
format!("{:?}", value).trim_matches('"').to_string(),
);
}
}
event.record(&mut FieldVisitor(&mut fields));
self.logs.lock().unwrap().push(fields);
}
}

let log_capture = LogCapture::new();
let subscriber = tracing_subscriber::registry().with(log_capture.clone());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hijack the logging impl to keep track of all the emitted fields.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for handrolling this compared to just using tracing_test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to have missed that 🤣 no reason at all.

set_global_default(subscriber).unwrap();

let request_count = Arc::new(AtomicUsize::new(0));
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let base: http::Uri = format!("http://{addr}").parse()?;

let server_handle = {
let request_count = request_count.clone();
tokio::spawn(async move {
loop {
let (tcp, _) = match listener.accept().await {
Ok(v) => v,
Err(_) => return,
};

let request_count = request_count.clone();
let service = service_fn(move |req: Request<Incoming>| {
let request_count = request_count.clone();
async move {
let (parts, body) = req.into_parts();
if parts.method == Method::POST {
let _ = body.collect().await;
}

if parts.method == Method::GET && parts.uri.path() == "/2018-06-01/runtime/invocation/next"
{
let count = request_count.fetch_add(1, Ordering::SeqCst);
if count < 300 {
let request_id = format!("test-request-{}", count + 1);
let res = Response::builder()
.status(StatusCode::OK)
.header("lambda-runtime-aws-request-id", &request_id)
.header("lambda-runtime-deadline-ms", "9999999999999")
.body(Full::new(Bytes::from_static(b"{}")))
.unwrap();
return Ok::<_, Infallible>(res);
} else {
let res = Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Full::new(Bytes::new()))
.unwrap();
return Ok::<_, Infallible>(res);
}
}

if parts.method == Method::POST && parts.uri.path().contains("/response") {
let res = Response::builder()
.status(StatusCode::OK)
.body(Full::new(Bytes::new()))
.unwrap();
return Ok::<_, Infallible>(res);
}

let res = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::new()))
.unwrap();
Ok::<_, Infallible>(res)
}
});

let io = TokioIo::new(tcp);
tokio::spawn(async move {
let _ = ServerBuilder::new(TokioExecutor::new())
.serve_connection(io, service)
.await;
});
}
})
};
Comment on lines 930 to 997
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create a small emulator for the client to communicate with.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this will be useful for other things too, has long irritated me that we don't have a great way of testing e2e.

Would it be worth extracting into a helper instead of keeping it inline in the test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!

Copy link
Collaborator Author

@FullyTyped FullyTyped Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one of the things I wanted to do was have a package, something like aws-lambda-simulator, that will provide a small server implementation, like above, and will allow users to customize behaviour, run benchmarks, etc, all in their integration suite, without needing to deploy.

I don't think we will, or should get there at this point in time, but I think it is worth considering.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be nice. Or even just exposing this as a test-util feature of the runtime.

I've dabbled with doing similar using cargo-lambda's simulator, but it was a bit unwieldy.

Might merit a backlog GitHub issue?


async fn test_handler(event: crate::LambdaEvent<serde_json::Value>) -> Result<(), Error> {
let request_id = &event.context.request_id;
info!(observed_request_id = request_id);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}

let handler = crate::service_fn(test_handler);
let client = Arc::new(Client::builder().with_endpoint(base).build()?);
let runtime = Runtime {
client: client.clone(),
config: Arc::new(Config {
function_name: "test_fn".to_string(),
memory: 128,
version: "1".to_string(),
log_stream: "test_stream".to_string(),
log_group: "test_log".to_string(),
}),
service: wrap_handler(handler, client),
concurrency_limit: 3,
};

let runtime_handle = tokio::spawn(async move { runtime.run_concurrent().await });

loop {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: This is a bit odd - if all we want to do is wait until all 300 requests are done, why not just use a oneshot channel that we write to on our 300th request? (And store it as an Option<Sender> in the server so that we can consume it)

tokio::time::sleep(Duration::from_millis(100)).await;
let count = request_count.load(Ordering::SeqCst);
if count >= 300 {
tokio::time::sleep(Duration::from_millis(500)).await;
break;
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue the requests, the tracker will collect the request-ids.

Copy link
Collaborator

@jlizen jlizen Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This note might be a nice comment


runtime_handle.abort();
server_handle.abort();

let logs = log_capture.logs.lock().unwrap();
let relevant_logs: Vec<_> = logs.iter().filter(|l| l.contains_key("observed_request_id")).collect();

assert!(
relevant_logs.len() >= 300,
"Should have at least 300 log entries, got {}",
relevant_logs.len()
);

let mut seen_ids = HashSet::new();
for log in &relevant_logs {
let observed_id = log.get("observed_request_id").unwrap();
assert!(
observed_id.starts_with("test-request-"),
"Request ID should match pattern: {}",
observed_id
);
assert!(
seen_ids.insert(observed_id.clone()),
"Request ID should be unique: {}",
observed_id
);
}
Comment on lines 1050 to 1079
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify uniqueness. If there's contamination, we won't observe uniqueness.

Copy link
Collaborator

@jlizen jlizen Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This note might be a nice comment


println!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i could do without this, libtest harness anyway has a success output

"✅ Concurrent structured logging test passed with {} unique request IDs",
seen_ids.len()
);
Ok(())
}
}