From 8528172407998abb2d258c34aaebac67ae34fd53 Mon Sep 17 00:00:00 2001 From: Tommy Kim Date: Tue, 15 Jul 2025 15:43:23 -0700 Subject: [PATCH] fix(awssqs): include MessageGroupId in SQS SendMessage request The MessageGroupId was being processed in templating but not actually used when sending messages to SQS. This fix ensures that when a MessageGroupId is specified in the notification template, it's properly included in the SendMessageInput, which is required for FIFO queues. - Add MessageGroupId to SendMessageInput when available - Add unit tests to verify MessageGroupId inclusion/exclusion behavior - Fix ensures proper FIFO queue message grouping functionality Signed-off-by: Tommy Kim --- pkg/services/awssqs.go | 9 +++++- pkg/services/awssqs_test.go | 55 +++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/pkg/services/awssqs.go b/pkg/services/awssqs.go index b69c5b33..b7df42d2 100644 --- a/pkg/services/awssqs.go +++ b/pkg/services/awssqs.go @@ -67,11 +67,18 @@ func (s awsSqsService) Send(notif Notification, dest Destination) error { } func (s awsSqsService) sendMessageInput(queueUrl *string, notif Notification) *sqs.SendMessageInput { - return &sqs.SendMessageInput{ + input := &sqs.SendMessageInput{ QueueUrl: queueUrl, MessageBody: aws.String(notif.Message), DelaySeconds: 10, } + + // Add MessageGroupId if available (required for FIFO queues) + if notif.AwsSqs != nil && notif.AwsSqs.MessageGroupId != "" { + input.MessageGroupId = aws.String(notif.AwsSqs.MessageGroupId) + } + + return input } func (s awsSqsService) getQueueInput(dest Destination) *sqs.GetQueueUrlInput { result := &sqs.GetQueueUrlInput{} diff --git a/pkg/services/awssqs_test.go b/pkg/services/awssqs_test.go index a8206cee..eb25c6ad 100644 --- a/pkg/services/awssqs_test.go +++ b/pkg/services/awssqs_test.go @@ -213,6 +213,61 @@ func TestGetClientOptionsCustomEndpointUrl_AwsSqs(t *testing.T) { assert.Equal(t, 2, len(options)) } +func TestSendMessageInput_WithMessageGroupId_AwsSqs(t *testing.T) { + s := NewTypedAwsSqsService(AwsSqsOptions{}) + queueUrl := "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue.fifo" + + notification := Notification{ + Message: "Hello", + AwsSqs: &AwsSqsNotification{ + MessageGroupId: "test-group-id", + }, + } + + input := SendMessageInput(s, &queueUrl, notification) + + assert.Equal(t, &queueUrl, input.QueueUrl) + assert.Equal(t, "Hello", *input.MessageBody) + assert.Equal(t, int32(10), input.DelaySeconds) + assert.Equal(t, "test-group-id", *input.MessageGroupId) +} + +func TestSendMessageInput_WithoutMessageGroupId_AwsSqs(t *testing.T) { + s := NewTypedAwsSqsService(AwsSqsOptions{}) + queueUrl := "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue" + + notification := Notification{ + Message: "Hello", + AwsSqs: &AwsSqsNotification{ + MessageGroupId: "", // Empty string + }, + } + + input := SendMessageInput(s, &queueUrl, notification) + + assert.Equal(t, &queueUrl, input.QueueUrl) + assert.Equal(t, "Hello", *input.MessageBody) + assert.Equal(t, int32(10), input.DelaySeconds) + assert.Nil(t, input.MessageGroupId) // Should not be set +} + +func TestSendMessageInput_WithoutAwsSqsNotification_AwsSqs(t *testing.T) { + s := NewTypedAwsSqsService(AwsSqsOptions{}) + queueUrl := "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue" + + notification := Notification{ + Message: "Hello", + AwsSqs: nil, // No AWS SQS notification + } + + input := SendMessageInput(s, &queueUrl, notification) + + assert.Equal(t, &queueUrl, input.QueueUrl) + assert.Equal(t, "Hello", *input.MessageBody) + assert.Equal(t, int32(10), input.DelaySeconds) + assert.Nil(t, input.MessageGroupId) // Should not be set +} + // Helpers var GetConfigOptions = (*awsSqsService).getConfigOptions var GetClientOptions = (*awsSqsService).getClientOptions