Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ idna_adapter = "=1.2.0"
lambda_runtime = { path = ".", features = ["tracing", "graceful-shutdown"] }
pin-project-lite = { workspace = true }
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["registry"] }

[package.metadata.docs.rs]
all-features = true
235 changes: 235 additions & 0 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,4 +908,239 @@ mod endpoint_tests {
server_handle.abort();
Ok(())
}

#[tokio::test]
#[cfg(feature = "experimental-concurrency")]
async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> {
use std::{
collections::{HashMap, HashSet},
sync::Mutex,
};
use tracing::{info, subscriber::set_global_default};
use tracing_subscriber::{layer::SubscriberExt, Layer};

#[derive(Clone)]
struct LogCapture {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for the handrolled layer rather than using https://docs.rs/tracing-test/latest/tracing_test/ ?

logs: Arc<Mutex<Vec<HashMap<String, String>>>>,
span_fields: Arc<Mutex<HashMap<tracing::Id, HashMap<String, String>>>>,
}

impl LogCapture {
fn new() -> Self {
Self {
logs: Arc::new(Mutex::new(Vec::new())),
span_fields: Arc::new(Mutex::new(HashMap::new())),
}
}
}

impl<S> Layer<S> for LogCapture
where
S: tracing::Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
{
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
id: &tracing::Id,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
if attrs.metadata().name() == "Lambda runtime invoke" {
let mut fields = HashMap::new();
struct FieldVisitor<'a>(&'a mut HashMap<String, String>);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do need to handroll our test layer, is there a reason to define FieldVisitor inline in each method? It seems to be the same for each?

impl<'a> tracing::field::Visit for FieldVisitor<'a> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.0.insert(
field.name().to_string(),
format!("{:?}", value).trim_matches('"').to_string(),
);
}
}
attrs.record(&mut FieldVisitor(&mut fields));
self.span_fields.lock().unwrap().insert(id.clone(), fields);
}
}

fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
let mut fields = HashMap::new();
struct FieldVisitor<'a>(&'a mut HashMap<String, String>);
impl<'a> tracing::field::Visit for FieldVisitor<'a> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.0.insert(
field.name().to_string(),
format!("{:?}", value).trim_matches('"').to_string(),
);
}
}
event.record(&mut FieldVisitor(&mut fields));

// Add span requestId if we're in a Lambda runtime invoke span
if let Some(span) = ctx.lookup_current() {
if let Some(span_fields) = self.span_fields.lock().unwrap().get(&span.id()) {
if let Some(request_id) = span_fields.get("requestId") {
fields.insert("span_request_id".to_string(), request_id.clone());
}
}
}

self.logs.lock().unwrap().push(fields);
}
}

let log_capture = LogCapture::new();
let subscriber = tracing_subscriber::registry().with(log_capture.clone());
set_global_default(subscriber).unwrap();

let request_count = Arc::new(AtomicUsize::new(0));
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let base: http::Uri = format!("http://{addr}").parse()?;

let server_handle = {
let request_count = request_count.clone();
tokio::spawn(async move {
loop {
let (tcp, _) = match listener.accept().await {
Ok(v) => v,
Err(_) => return,
};

let request_count = request_count.clone();
let service = service_fn(move |req: Request<Incoming>| {
let request_count = request_count.clone();
async move {
let (parts, body) = req.into_parts();
if parts.method == Method::POST {
let _ = body.collect().await;
}

if parts.method == Method::GET && parts.uri.path() == "/2018-06-01/runtime/invocation/next"
{
let count = request_count.fetch_add(1, Ordering::SeqCst);
if count < 300 {
let request_id = format!("test-request-{}", count + 1);
let res = Response::builder()
.status(StatusCode::OK)
.header("lambda-runtime-aws-request-id", &request_id)
.header("lambda-runtime-deadline-ms", "9999999999999")
.body(Full::new(Bytes::from_static(b"{}")))
.unwrap();
return Ok::<_, Infallible>(res);
} else {
let res = Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Full::new(Bytes::new()))
.unwrap();
return Ok::<_, Infallible>(res);
}
}

if parts.method == Method::POST && parts.uri.path().contains("/response") {
let res = Response::builder()
.status(StatusCode::OK)
.body(Full::new(Bytes::new()))
.unwrap();
return Ok::<_, Infallible>(res);
}

let res = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::new()))
.unwrap();
Ok::<_, Infallible>(res)
}
});

let io = TokioIo::new(tcp);
tokio::spawn(async move {
let _ = ServerBuilder::new(TokioExecutor::new())
.serve_connection(io, service)
.await;
});
}
})
};
Comment on lines 930 to 997
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create a small emulator for the client to communicate with.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this will be useful for other things too, has long irritated me that we don't have a great way of testing e2e.

Would it be worth extracting into a helper instead of keeping it inline in the test?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!

Copy link
Collaborator Author

@FullyTyped FullyTyped Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one of the things I wanted to do was have a package, something like aws-lambda-simulator, that will provide a small server implementation, like above, and will allow users to customize behaviour, run benchmarks, etc, all in their integration suite, without needing to deploy.

I don't think we will, or should get there at this point in time, but I think it is worth considering.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be nice. Or even just exposing this as a test-util feature of the runtime.

I've dabbled with doing similar using cargo-lambda's simulator, but it was a bit unwieldy.

Might merit a backlog GitHub issue?


async fn test_handler(event: crate::LambdaEvent<serde_json::Value>) -> Result<(), Error> {
let request_id = &event.context.request_id;
info!(observed_request_id = request_id);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}

let handler = crate::service_fn(test_handler);
let client = Arc::new(Client::builder().with_endpoint(base).build()?);

// Add tracing layer to capture span fields
use crate::layers::trace::TracingLayer;
use tower::ServiceBuilder;
let service = ServiceBuilder::new()
.layer(TracingLayer::new())
.service(wrap_handler(handler, client.clone()));

let runtime = Runtime {
client: client.clone(),
config: Arc::new(Config {
function_name: "test_fn".to_string(),
memory: 128,
version: "1".to_string(),
log_stream: "test_stream".to_string(),
log_group: "test_log".to_string(),
}),
service,
concurrency_limit: 3,
};

let runtime_handle = tokio::spawn(async move { runtime.run_concurrent().await });

loop {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: This is a bit odd - if all we want to do is wait until all 300 requests are done, why not just use a oneshot channel that we write to on our 300th request? (And store it as an Option<Sender> in the server so that we can consume it)

tokio::time::sleep(Duration::from_millis(100)).await;
let count = request_count.load(Ordering::SeqCst);
if count >= 300 {
tokio::time::sleep(Duration::from_millis(500)).await;
break;
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue the requests, the tracker will collect the request-ids.

Copy link
Collaborator

@jlizen jlizen Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This note might be a nice comment


runtime_handle.abort();
server_handle.abort();

let logs = log_capture.logs.lock().unwrap();
let relevant_logs: Vec<_> = logs.iter().filter(|l| l.contains_key("observed_request_id")).collect();

assert!(
relevant_logs.len() >= 300,
"Should have at least 300 log entries, got {}",
relevant_logs.len()
);

let mut seen_ids = HashSet::new();
for log in &relevant_logs {
let observed_id = log.get("observed_request_id").unwrap();
let span_request_id = log.get("span_request_id").unwrap();

assert!(
observed_id.starts_with("test-request-"),
"Request ID should match pattern: {}",
observed_id
);
assert!(
seen_ids.insert(observed_id.clone()),
"Request ID should be unique: {}",
observed_id
);

// Verify span request ID matches logged request ID
assert_eq!(
observed_id, span_request_id,
"Span request ID should match logged request ID: span={}, logged={}",
span_request_id, observed_id
);
}

println!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i could do without this, libtest harness anyway has a success output

"✅ Concurrent structured logging test passed with {} unique request IDs",
seen_ids.len()
);
Ok(())
}
}
Loading