Skip to content

Commit

Permalink
Support SNS message unwrap
Browse files Browse the repository at this point in the history
closes #119

---

Pull Request resolved: #120
commit_hash:04c7c7517e27461fa606fc29241ee04bd0d7f202
  • Loading branch information
laskoviymishka authored and robot-piglet committed Nov 27, 2024
1 parent ddf2bbd commit 30f7a68
Showing 1 changed file with 30 additions and 32 deletions.
62 changes: 30 additions & 32 deletions pkg/providers/s3/source/object_fetcher_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,28 @@ type sqsSource struct {
mu sync.Mutex
}

func (s *sqsSource) fetchMessages() ([]Object, error) {
// records struct is used for unmarshalling SQS messages in the FetchObjects method.
type records struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
Size int64 `json:"size"`
} `json:"object"`
ConfigurationID string `json:"configurationId"`
} `json:"s3"`

EventName string `json:"eventName"`
EventTime time.Time `json:"eventTime"`
} `json:"Records"`
}
// DTO struct is used for unmarshalling SQS messages in the FetchObjects method.
type DTO struct {
Type string `json:"Type"`
Message string `json:"Message"`
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
} `json:"bucket"`
Object struct {
Key string `json:"key"`
Size int64 `json:"size"`
} `json:"object"`
ConfigurationID string `json:"configurationId"`
} `json:"s3"`

EventName string `json:"eventName"`
EventTime time.Time `json:"eventTime"`
} `json:"Records"`
}

func (s *sqsSource) fetchMessages() ([]Object, error) {
messages, err := s.client.ReceiveMessageWithContext(s.ctx, &sqs.ReceiveMessageInput{
QueueUrl: s.queueURL,
MaxNumberOfMessages: aws.Int64(10), // maximum is 10, but fewer msg can be delivered
Expand All @@ -73,11 +75,17 @@ func (s *sqsSource) fetchMessages() ([]Object, error) {
ReceiptHandle: message.ReceiptHandle,
}
if !strings.Contains(*message.Body, TestEvent) && strings.Contains(*message.Body, CreationEvent) {
var records records
if err := json.Unmarshal([]byte(*message.Body), &records); err != nil {
var dto DTO
if err := json.Unmarshal([]byte(*message.Body), &dto); err != nil {
return nil, xerrors.Errorf("failed to unmarshal message records: %w", err)
}
for _, record := range records.Records {
if len(dto.Records) == 0 && len(dto.Message) > 0 {
// we receive wrapped message, need to unwrap it, actual records are inside `Message` field.
if err := json.Unmarshal([]byte(dto.Message), &dto); err != nil {
return nil, xerrors.Errorf("failed to unmarshal message records: %w", err)
}
}
for _, record := range dto.Records {
if strings.Contains(record.EventName, CreationEvent) {
// SQS escapes path strings, we need to invert the operation here, from simple%3D1234.jsonl to simple=1234.jsonl for example
unescapedKey, err := url.QueryUnescape(record.S3.Object.Key)
Expand Down Expand Up @@ -270,16 +278,6 @@ func (s *sqsSource) batchDelete() error {
return nil
}

func containsCreationEventConfig(events []*string) bool {
for _, event := range events {
if strings.Contains(*event, CreationEvent) {
return true
}
}

return false
}

func NewSQSSource(ctx context.Context, logger log.Logger, reader reader.Reader,
sess *session.Session, sourceConfig *s3.S3Source,
) (*sqsSource, error) {
Expand Down

0 comments on commit 30f7a68

Please sign in to comment.