-
Hey, I'm planning to upgrade from 2.4 to 3.0. For 2.4, I added some additional functionality to the QueueMessagingTemplate and I'm now looking into if and if so, how I need to port that for the 3.0 SqsTemplate. Two things I did that are now built-in are support for async sending (I needed that because without this the underlying AmazonSQSBufferedAsyncClient would block for some time when sending multiple messages from a single thread, creating huge delays) and adding a header containing the payload type (to support multiple message payload types), so that's nice! Another thing is that I added support to propagate the current trace information. This support is very important to us: it allows us to correlate logging between senders and receivers and also to quickly find the receiver's logging for dead-lettered messages so that we can understand why the processing failed. What are your thoughts regarding an extension like this? Is there a better way that I'm overlooking to enrich the message that's being sent transparently while ensuring that I can access the current tracing context? I could also let my client code call an adapter that would create a full Message with the required header to then pass that to the template, but I'd much rather let clients use the SqsTemplate (or the interfaces that it implements) directly. Thanks in advance for any advice! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 6 replies
-
Hi @jkuipers, nice to see you around. I believe the If you use the blocking send methods the thread you shouldn't run into any issues - the thread that reaches the converter will be the same that calls the method, and after the message is sent the same thread should continue execution. If you use the async send methods though it might be trickier - you should be able to reach the converter on the calling thread, but after that you'll hop threads so you must be mindful of cleaning up any How does that sound so far? I'd be interested in having a clean way of solving this, or it this is clean enough we should at least document it properly. Thanks. |
Beta Was this translation helpful? Give feedback.
-
My (a little bit rough) attempt using Micrometer Observability API: class TracingSqsEndpoint extends SqsEndpoint {
private final ObservationRegistry observationRegistry;
protected TracingSqsEndpoint(SqsEndpointBuilder builder, ObservationRegistry tracing) {
super(builder);
this.observationRegistry = tracing;
}
@Override
protected <T> MessageListener<T> createMessageListenerInstance(
InvocableHandlerMethod handlerMethod) {
return new TracingMessageListenerWrapper<>(
super.createMessageListenerInstance(handlerMethod), observationRegistry);
}
@RequiredArgsConstructor
public static class TracingSqsEndpointBuilder extends SqsEndpointBuilder {
private final ObservationRegistry observationRegistry;
@Override
public SqsEndpoint build() {
return new TracingSqsEndpoint(this, observationRegistry);
}
}
} Wrapper adapted to use ObservationRegistry with the most important change of introducing SqsReceiverContext: @RequiredArgsConstructor
class TracingMessageListenerWrapper<T> implements MessageListener<T> {
private final MessageListener<T> delegate;
private final ObservationRegistry observationRegistry;
@Override
public void onMessage(Message<T> message) {
SqsReceiverContext sqsReceiverContext = new SqsReceiverContext(message);
Observation observation =
Observation.start("sqs.receive", () -> sqsReceiverContext, observationRegistry);
observation.observeChecked(() -> delegate.onMessage(message));
}
static class SqsReceiverContext extends ReceiverContext<Message<?>> {
public SqsReceiverContext(Message<?> message) {
super(((carrier, key) -> carrier.getHeaders().get(key, String.class)));
setCarrier(message);
}
}
} Post processor taken as is with ObservationRegistry. @RequiredArgsConstructor
class TracingSqsListenerAnnotationBeanPostProcessor extends SqsListenerAnnotationBeanPostProcessor {
private final ObjectProvider<ObservationRegistry> observationRegistry;
/**
* Overrides parent method to ensure that our custom endpoint with tracing support is returned.
*/
@Override
protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
return new TracingSqsEndpoint.TracingSqsEndpointBuilder(observationRegistry.getIfAvailable())
.queueNames(resolveEndpointNames(sqsListenerAnnotation.value()))
.factoryBeanName(resolveAsString(sqsListenerAnnotation.factory(), "factory"))
.id(getEndpointId(sqsListenerAnnotation.id()))
.pollTimeoutSeconds(
resolveAsInteger(sqsListenerAnnotation.pollTimeoutSeconds(), "pollTimeoutSeconds"))
.maxMessagesPerPoll(
resolveAsInteger(sqsListenerAnnotation.maxMessagesPerPoll(), "maxMessagesPerPoll"))
.maxConcurrentMessages(
resolveAsInteger(
sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages"))
.messageVisibility(
resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility"))
.build();
}
} and sender part with ugly @Bean(name = SqsBeanNames.SQS_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_BEAN_NAME)
TracingSqsListenerAnnotationBeanPostProcessor tracingSqsLABPP(
ObjectProvider<ObservationRegistry> observationRegistry) {
return new TracingSqsListenerAnnotationBeanPostProcessor(observationRegistry);
}
@Bean
SqsTemplate sqsTemplate(
SqsAsyncClient sqsAsyncClient, ObjectMapper objectMapper, Tracer tracing) {
var converter =
new SqsMessagingMessageConverter() {
@Override
public software.amazon.awssdk.services.sqs.model.Message fromMessagingMessage(
Message<?> message, MessageConversionContext context) {
Span span = tracing.nextSpan().name("send").remoteServiceName("sqs").start();
Map<String, Object> newHeaders =
Map.of(
"traceparent",
"00-" + span.context().traceId() + "-" + span.context().spanId() + "-00");
span.end();
return super.fromMessagingMessage(
MessageHeaderUtils.addHeadersIfAbsent(message, newHeaders), context);
}
};
converter.setObjectMapper(objectMapper);
return SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient).messageConverter(converter).build();
} |
Beta Was this translation helpful? Give feedback.
Something like this seems to do the trick (Tracing is a
brave.Tracing
type):