From d7575ceb417ae0362fa929a4d1f3660a3b1841cd Mon Sep 17 00:00:00 2001 From: Thomas Lathuiliere Date: Wed, 10 Jul 2024 19:00:57 +0200 Subject: [PATCH] feat(webhook): send webhook event using a tx --- repositories/webhook_events_repository.go | 5 +- usecases/webhook_events_usecase.go | 75 +++++++++++++---------- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/repositories/webhook_events_repository.go b/repositories/webhook_events_repository.go index 02f4b9d96..07516c65a 100644 --- a/repositories/webhook_events_repository.go +++ b/repositories/webhook_events_repository.go @@ -36,15 +36,14 @@ func (repo MarbleDbRepository) ListWebhookEvents(ctx context.Context, exec Execu return nil, err } - query := selectWebhookEvents() mergedFilters := filters.MergeWithDefaults() + query := selectWebhookEvents().Limit(mergedFilters.Limit) + if mergedFilters.DeliveryStatus != nil { query = query.Where(squirrel.Eq{"delivery_status": mergedFilters.DeliveryStatus}) } - query = query.Limit(mergedFilters.Limit) - return SqlToListOfRow( ctx, exec, diff --git a/usecases/webhook_events_usecase.go b/usecases/webhook_events_usecase.go index 0fe3a746c..1f081d193 100644 --- a/usecases/webhook_events_usecase.go +++ b/usecases/webhook_events_usecase.go @@ -113,10 +113,10 @@ func (usecase WebhookEventsUsecase) SendWebhookEvents( uploadErrorChan := make(chan error, len(pendingWebhookEvents)) deliveryStatusChan := make(chan models.WebhookEventDeliveryStatus, len(pendingWebhookEvents)) - startProcessSendWebhookEvent := func(webhookEvent models.WebhookEvent) { + startProcessSendWebhookEvent := func(webhookEventId string) { defer waitGroup.Done() - logger := logger.With("webhook_event_id", webhookEvent.Id) - deliveryStatus, err := usecase.sendWebhookEvent(ctx, webhookEvent, logger) + logger := logger.With("webhook_event_id", webhookEventId) + deliveryStatus, err := usecase.sendWebhookEvent(ctx, webhookEventId, logger) if err != nil { uploadErrorChan <- err } @@ -127,7 +127,7 @@ func (usecase WebhookEventsUsecase) SendWebhookEvents( for _, webhookEvent := range pendingWebhookEvents { waitGroup.Add(1) - go startProcessSendWebhookEvent(webhookEvent) + go startProcessSendWebhookEvent(webhookEvent.Id) } waitGroup.Wait() @@ -166,35 +166,44 @@ func (usecase WebhookEventsUsecase) SendWebhookEvents( // It returns the delivery status of the webhook event and an error if updating the webhook event fails. func (usecase *WebhookEventsUsecase) sendWebhookEvent( ctx context.Context, - webhookEvent models.WebhookEvent, + webhookEventId string, logger *slog.Logger, ) (*models.WebhookEventDeliveryStatus, error) { - err := usecase.enforceSecurity.SendWebhookEvent(ctx, webhookEvent.OrganizationId, webhookEvent.PartnerId) - if err != nil { - return nil, err - } - - exec := usecase.executorFactory.NewExecutor() - logger.InfoContext(ctx, fmt.Sprintf("Start processing webhook event %s", webhookEvent.Id)) - - err = usecase.convoyRepository.SendWebhookEvent(ctx, webhookEvent) - - webhookEventUpdate := models.WebhookEventUpdate{ - Id: webhookEvent.Id, - SendAttemptCount: webhookEvent.SendAttemptCount + 1, - } - if err == nil { - webhookEventUpdate.DeliveryStatus = models.Success - } else { - if webhookEventUpdate.SendAttemptCount >= 3 { - webhookEventUpdate.DeliveryStatus = models.Failed - } else { - webhookEventUpdate.DeliveryStatus = models.Retry - } - } - err = usecase.webhookEventsRepository.UpdateWebhookEvent(ctx, exec, webhookEventUpdate) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("error while updating webhook event %s", webhookEvent.Id)) - } - return &webhookEventUpdate.DeliveryStatus, nil + return executor_factory.TransactionReturnValue( + ctx, + usecase.transactionFactory, + func(tx repositories.Executor) (*models.WebhookEventDeliveryStatus, error) { + webhookEvent, err := usecase.webhookEventsRepository.GetWebhookEvent(ctx, tx, webhookEventId) + if err != nil { + return nil, err + } + + err = usecase.enforceSecurity.SendWebhookEvent(ctx, webhookEvent.OrganizationId, webhookEvent.PartnerId) + if err != nil { + return nil, err + } + + logger.InfoContext(ctx, fmt.Sprintf("Start processing webhook event %s", webhookEvent.Id)) + + err = usecase.convoyRepository.SendWebhookEvent(ctx, webhookEvent) + webhookEventUpdate := models.WebhookEventUpdate{ + Id: webhookEvent.Id, + SendAttemptCount: webhookEvent.SendAttemptCount + 1, + } + if err == nil { + webhookEventUpdate.DeliveryStatus = models.Success + } else { + if webhookEventUpdate.SendAttemptCount >= 3 { + webhookEventUpdate.DeliveryStatus = models.Failed + } else { + webhookEventUpdate.DeliveryStatus = models.Retry + } + } + err = usecase.webhookEventsRepository.UpdateWebhookEvent(ctx, tx, webhookEventUpdate) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf( + "error while updating webhook event %s", webhookEvent.Id)) + } + return &webhookEventUpdate.DeliveryStatus, nil + }) }