Skip to content

Commit

Permalink
fix: silent error during batch ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
Pascal-Delange committed Jan 9, 2025
1 parent c9f7f77 commit 160c0cb
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 57 deletions.
10 changes: 3 additions & 7 deletions cmd/batch_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"log/slog"
"time"

"github.com/checkmarble/marble-backend/infra"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/getsentry/sentry-go"
)

// Deprecated
func RunBatchIngestion() error {
// This is where we read the environment variables and set up the configuration for the application.
gcpConfig := infra.GcpConfig{
Expand Down Expand Up @@ -103,10 +103,6 @@ func RunBatchIngestion() error {
usecases.WithConvoyServer(convoyConfiguration.APIUrl),
)

err = jobs.IngestDataFromCsv(ctx, uc)
if err != nil {
logger.ErrorContext(ctx, "failed to ingest data from csvs", slog.String("error", err.Error()))
}

return err
jobs.IngestDataFromCsv(ctx, uc)
return nil
}
10 changes: 3 additions & 7 deletions cmd/scheduled_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"log/slog"
"time"

"github.com/checkmarble/marble-backend/infra"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

// Deprecated
func RunScheduledExecuter() error {
// This is where we read the environment variables and set up the configuration for the application.
gcpConfig := infra.GcpConfig{
Expand Down Expand Up @@ -116,10 +116,6 @@ func RunScheduledExecuter() error {
usecases.WithConvoyServer(convoyConfiguration.APIUrl),
)

err = jobs.ExecuteAllScheduledScenarios(ctx, uc)
if err != nil {
logger.ErrorContext(ctx, "failed to execute all scheduled scenarios", slog.String("error", err.Error()))
}

return err
jobs.ExecuteAllScheduledScenarios(ctx, uc)
return nil
}
10 changes: 3 additions & 7 deletions cmd/send_pending_webhook_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"context"
"log/slog"
"time"

"github.com/checkmarble/marble-backend/infra"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/getsentry/sentry-go"
)

// Deprecated
func RunSendPendingWebhookEvents() error {
// This is where we read the environment variables and set up the configuration for the application.
gcpConfig := infra.GcpConfig{
Expand Down Expand Up @@ -93,10 +93,6 @@ func RunSendPendingWebhookEvents() error {
usecases.WithConvoyServer(convoyConfiguration.APIUrl),
)

err = jobs.SendPendingWebhookEvents(ctx, uc)
if err != nil {
logger.ErrorContext(ctx, "failed to send pending webhook events", slog.String("error", err.Error()))
}

return err
jobs.SendPendingWebhookEvents(ctx, uc)
return nil
}
7 changes: 3 additions & 4 deletions jobs/execute_with_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/utils"

"github.com/cockroachdb/errors"
"github.com/getsentry/sentry-go"
)

Expand All @@ -16,7 +15,7 @@ func executeWithMonitoring(
uc usecases.Usecases,
jobName string,
fn func(context.Context, usecases.Usecases) error,
) error {
) {
logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, fmt.Sprintf("Start job %s", jobName))

Expand All @@ -43,7 +42,8 @@ func executeWithMonitoring(
} else {
sentry.CaptureException(err)
}
return errors.Wrap(err, fmt.Sprintf("error executing job %s", jobName))
logger.ErrorContext(ctx, fmt.Sprintf("Unexpected Error in batch job: %+v", err))
return
}

sentry.CaptureCheckIn(
Expand All @@ -56,5 +56,4 @@ func executeWithMonitoring(
)

logger.InfoContext(ctx, fmt.Sprintf("Done executing job %s", jobName))
return nil
}
4 changes: 2 additions & 2 deletions jobs/ingest_data_from_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

const csvIngestionTimeout = 1 * time.Hour

func IngestDataFromCsv(ctx context.Context, uc usecases.Usecases) error {
return executeWithMonitoring(
func IngestDataFromCsv(ctx context.Context, uc usecases.Usecases) {
executeWithMonitoring(
ctx,
uc,
"batch-ingestion",
Expand Down
5 changes: 2 additions & 3 deletions jobs/scheduled_scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import (

const batchScenarioExecutionTimeout = 3 * time.Hour

// Runs every minute
func ExecuteAllScheduledScenarios(ctx context.Context, uc usecases.Usecases) error {
return executeWithMonitoring(
func ExecuteAllScheduledScenarios(ctx context.Context, uc usecases.Usecases) {
executeWithMonitoring(
ctx,
uc,
"scheduled-execution",
Expand Down
19 changes: 6 additions & 13 deletions jobs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import (
"github.com/checkmarble/marble-backend/utils"
)

func errToReturnCode(err error) int {
if err != nil {
return 1
}
return 0
}

// Deprecated and to be moved into the river task scheduler
func RunScheduler(ctx context.Context, usecases usecases.Usecases) {
taskr := tasker.New(tasker.Option{
Expand All @@ -25,22 +18,22 @@ func RunScheduler(ctx context.Context, usecases usecases.Usecases) {
taskr.Task("* * * * *", func(ctx context.Context) (int, error) {
logger := utils.LoggerFromContext(ctx).With("job", "execute_all_scheduled_scenarios")
ctx = utils.StoreLoggerInContext(ctx, logger)
err := ExecuteAllScheduledScenarios(ctx, usecases)
return errToReturnCode(err), err
ExecuteAllScheduledScenarios(ctx, usecases)
return 0, nil
})

taskr.Task("* * * * *", func(ctx context.Context) (int, error) {
logger := utils.LoggerFromContext(ctx).With("job", "ingest_data_from_csv")
ctx = utils.StoreLoggerInContext(ctx, logger)
err := IngestDataFromCsv(ctx, usecases)
return errToReturnCode(err), err
IngestDataFromCsv(ctx, usecases)
return 0, nil
})

taskr.Task("*/10 * * * *", func(ctx context.Context) (int, error) {
logger := utils.LoggerFromContext(ctx).With("job", "send_webhook_events_to_convoy")
ctx = utils.StoreLoggerInContext(ctx, logger)
err := SendPendingWebhookEvents(ctx, usecases)
return errToReturnCode(err), err
SendPendingWebhookEvents(ctx, usecases)
return 0, nil
})

taskr.Run()
Expand Down
5 changes: 2 additions & 3 deletions jobs/send_pending_webhook_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (
"github.com/checkmarble/marble-backend/usecases"
)

// Runs every minute
func SendPendingWebhookEvents(ctx context.Context, uc usecases.Usecases) error {
return executeWithMonitoring(
func SendPendingWebhookEvents(ctx context.Context, uc usecases.Usecases) {
executeWithMonitoring(
ctx,
uc,
"send-webhook-events",
Expand Down
8 changes: 6 additions & 2 deletions usecases/ingestion_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (usecase *IngestionUseCase) processUploadLog(ctx context.Context, uploadLog
out := usecase.readFileIngestObjects(ctx, file.FileName, file.ReadCloser)
if out.err != nil {
setToFailed(out.numRowsIngested)
return err
return out.err
}

currentTime := time.Now()
Expand All @@ -409,6 +409,8 @@ type ingestionResult struct {
err error
}

// This method uses a return value wrapping an error, because we still want to use the number of rows ingested even if
// an error occurred.
func (usecase *IngestionUseCase) readFileIngestObjects(ctx context.Context, fileName string, fileReader io.Reader) ingestionResult {
logger := utils.LoggerFromContext(ctx)
logger.InfoContext(ctx, fmt.Sprintf("Ingesting data from CSV %s", fileName))
Expand Down Expand Up @@ -455,7 +457,9 @@ func (usecase *IngestionUseCase) ingestObjectsFromCSV(ctx context.Context, organ
duration := end.Sub(start)
// divide by 1e6 convert to milliseconds (base is nanoseconds)
avgDuration := float64(duration) / float64(total*1e6)
logger.InfoContext(ctx, fmt.Sprintf("Successfully ingested %d objects in %s, average %vms", total, duration, avgDuration))
if total > 0 {
logger.InfoContext(ctx, fmt.Sprintf("Successfully ingested %d objects in %s, average %vms", total, duration, avgDuration))
}
}
defer printDuration()

Expand Down
9 changes: 0 additions & 9 deletions utils/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strings"

"github.com/getsentry/sentry-go"
)
Expand All @@ -13,14 +12,6 @@ func LogAndReportSentryError(ctx context.Context, err error) {
logger := LoggerFromContext(ctx)
logger.ErrorContext(ctx, fmt.Sprintf("%+v", err))

// Known issue where Cloud Run will sometimes fail to create the unix socket to connect to CloudSQL.
// This always happens at the launching of a job or server, when we set up the db pool.
// In this case, we don't log the error in Sentry
if strings.Contains(err.Error(), "failed to connect to `host=/cloudsql/") {
logger.WarnContext(ctx, "Failed to create unix socket to connect to CloudSQL. Wait for the next execution of the job or retry starting the server")
return
}

// Ignore errors that are due to context deadlines or canceled context, as presumably their root case has been handled
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logger.DebugContext(ctx, fmt.Sprintf("Deadline exceeded or context canceled: %v", err))
Expand Down

0 comments on commit 160c0cb

Please sign in to comment.