Skip to content

Commit

Permalink
feat(webhook): send webhook event using a tx
Browse files Browse the repository at this point in the history
  • Loading branch information
balzdur committed Jul 12, 2024
1 parent 390c4aa commit d7575ce
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 36 deletions.
5 changes: 2 additions & 3 deletions repositories/webhook_events_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ func (repo MarbleDbRepository) ListWebhookEvents(ctx context.Context, exec Execu
return nil, err
}

query := selectWebhookEvents()
mergedFilters := filters.MergeWithDefaults()

query := selectWebhookEvents().Limit(mergedFilters.Limit)

if mergedFilters.DeliveryStatus != nil {
query = query.Where(squirrel.Eq{"delivery_status": mergedFilters.DeliveryStatus})
}

query = query.Limit(mergedFilters.Limit)

return SqlToListOfRow(
ctx,
exec,
Expand Down
75 changes: 42 additions & 33 deletions usecases/webhook_events_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ func (usecase WebhookEventsUsecase) SendWebhookEvents(
uploadErrorChan := make(chan error, len(pendingWebhookEvents))
deliveryStatusChan := make(chan models.WebhookEventDeliveryStatus, len(pendingWebhookEvents))

startProcessSendWebhookEvent := func(webhookEvent models.WebhookEvent) {
startProcessSendWebhookEvent := func(webhookEventId string) {
defer waitGroup.Done()
logger := logger.With("webhook_event_id", webhookEvent.Id)
deliveryStatus, err := usecase.sendWebhookEvent(ctx, webhookEvent, logger)
logger := logger.With("webhook_event_id", webhookEventId)
deliveryStatus, err := usecase.sendWebhookEvent(ctx, webhookEventId, logger)
if err != nil {
uploadErrorChan <- err
}
Expand All @@ -127,7 +127,7 @@ func (usecase WebhookEventsUsecase) SendWebhookEvents(

for _, webhookEvent := range pendingWebhookEvents {
waitGroup.Add(1)
go startProcessSendWebhookEvent(webhookEvent)
go startProcessSendWebhookEvent(webhookEvent.Id)
}

waitGroup.Wait()
Expand Down Expand Up @@ -166,35 +166,44 @@ func (usecase WebhookEventsUsecase) SendWebhookEvents(
// It returns the delivery status of the webhook event and an error if updating the webhook event fails.
func (usecase *WebhookEventsUsecase) sendWebhookEvent(
ctx context.Context,
webhookEvent models.WebhookEvent,
webhookEventId string,
logger *slog.Logger,
) (*models.WebhookEventDeliveryStatus, error) {
err := usecase.enforceSecurity.SendWebhookEvent(ctx, webhookEvent.OrganizationId, webhookEvent.PartnerId)
if err != nil {
return nil, err
}

exec := usecase.executorFactory.NewExecutor()
logger.InfoContext(ctx, fmt.Sprintf("Start processing webhook event %s", webhookEvent.Id))

err = usecase.convoyRepository.SendWebhookEvent(ctx, webhookEvent)

webhookEventUpdate := models.WebhookEventUpdate{
Id: webhookEvent.Id,
SendAttemptCount: webhookEvent.SendAttemptCount + 1,
}
if err == nil {
webhookEventUpdate.DeliveryStatus = models.Success
} else {
if webhookEventUpdate.SendAttemptCount >= 3 {
webhookEventUpdate.DeliveryStatus = models.Failed
} else {
webhookEventUpdate.DeliveryStatus = models.Retry
}
}
err = usecase.webhookEventsRepository.UpdateWebhookEvent(ctx, exec, webhookEventUpdate)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error while updating webhook event %s", webhookEvent.Id))
}
return &webhookEventUpdate.DeliveryStatus, nil
return executor_factory.TransactionReturnValue(
ctx,
usecase.transactionFactory,
func(tx repositories.Executor) (*models.WebhookEventDeliveryStatus, error) {
webhookEvent, err := usecase.webhookEventsRepository.GetWebhookEvent(ctx, tx, webhookEventId)
if err != nil {
return nil, err
}

err = usecase.enforceSecurity.SendWebhookEvent(ctx, webhookEvent.OrganizationId, webhookEvent.PartnerId)
if err != nil {
return nil, err
}

logger.InfoContext(ctx, fmt.Sprintf("Start processing webhook event %s", webhookEvent.Id))

err = usecase.convoyRepository.SendWebhookEvent(ctx, webhookEvent)
webhookEventUpdate := models.WebhookEventUpdate{
Id: webhookEvent.Id,
SendAttemptCount: webhookEvent.SendAttemptCount + 1,
}
if err == nil {
webhookEventUpdate.DeliveryStatus = models.Success
} else {
if webhookEventUpdate.SendAttemptCount >= 3 {
webhookEventUpdate.DeliveryStatus = models.Failed
} else {
webhookEventUpdate.DeliveryStatus = models.Retry
}
}
err = usecase.webhookEventsRepository.UpdateWebhookEvent(ctx, tx, webhookEventUpdate)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf(
"error while updating webhook event %s", webhookEvent.Id))
}
return &webhookEventUpdate.DeliveryStatus, nil
})
}

0 comments on commit d7575ce

Please sign in to comment.