diff --git a/cmd/run_job_scheduler.go b/cmd/run_job_scheduler.go deleted file mode 100644 index 46b914c34..000000000 --- a/cmd/run_job_scheduler.go +++ /dev/null @@ -1,97 +0,0 @@ -package cmd - -import ( - "context" - "time" - - "github.com/checkmarble/marble-backend/infra" - "github.com/checkmarble/marble-backend/jobs" - "github.com/checkmarble/marble-backend/models" - "github.com/checkmarble/marble-backend/repositories" - "github.com/checkmarble/marble-backend/usecases" - "github.com/checkmarble/marble-backend/utils" - "github.com/getsentry/sentry-go" -) - -func RunJobScheduler() error { - // This is where we read the environment variables and set up the configuration for the application. - gcpConfig := infra.GcpConfig{ - EnableTracing: utils.GetEnv("ENABLE_GCP_TRACING", false), - ProjectId: utils.GetEnv("GOOGLE_CLOUD_PROJECT", ""), - } - pgConfig := infra.PgConfig{ - Database: "marble", - DbConnectWithSocket: utils.GetEnv("PG_CONNECT_WITH_SOCKET", false), - Hostname: utils.GetRequiredEnv[string]("PG_HOSTNAME"), - Password: utils.GetRequiredEnv[string]("PG_PASSWORD"), - Port: utils.GetEnv("PG_PORT", "5432"), - User: utils.GetRequiredEnv[string]("PG_USER"), - } - convoyConfiguration := infra.ConvoyConfiguration{ - APIKey: utils.GetEnv("CONVOY_API_KEY", ""), - APIUrl: utils.GetEnv("CONVOY_API_URL", ""), - ProjectID: utils.GetEnv("CONVOY_PROJECT_ID", ""), - RateLimit: utils.GetEnv("CONVOY_RATE_LIMIT", 50), - } - licenseConfig := models.LicenseConfiguration{ - LicenseKey: utils.GetEnv("LICENSE_KEY", ""), - KillIfReadLicenseError: utils.GetEnv("KILL_IF_READ_LICENSE_ERROR", false), - } - jobConfig := struct { - env string - appName string - loggingFormat string - sentryDsn string - failedWebhooksRetryPageSize int - ingestionBucketUrl string - }{ - env: utils.GetEnv("ENV", "development"), - appName: "marble-backend", - ingestionBucketUrl: utils.GetRequiredEnv[string]("INGESTION_BUCKET_URL"), - loggingFormat: utils.GetEnv("LOGGING_FORMAT", "text"), - sentryDsn: utils.GetEnv("SENTRY_DSN", ""), - failedWebhooksRetryPageSize: utils.GetEnv("FAILED_WEBHOOKS_RETRY_PAGE_SIZE", 1000), - } - - logger := utils.NewLogger(jobConfig.loggingFormat) - ctx := utils.StoreLoggerInContext(context.Background(), logger) - license := infra.VerifyLicense(licenseConfig) - - infra.SetupSentry(jobConfig.sentryDsn, jobConfig.env) - defer sentry.Flush(3 * time.Second) - - tracingConfig := infra.TelemetryConfiguration{ - ApplicationName: jobConfig.appName, - Enabled: gcpConfig.EnableTracing, - ProjectID: gcpConfig.ProjectId, - } - telemetryRessources, err := infra.InitTelemetry(tracingConfig) - if err != nil { - utils.LogAndReportSentryError(ctx, err) - return err - } - ctx = utils.StoreOpenTelemetryTracerInContext(ctx, telemetryRessources.Tracer) - - pool, err := infra.NewPostgresConnectionPool(ctx, pgConfig.GetConnectionString(), telemetryRessources.TracerProvider) - if err != nil { - utils.LogAndReportSentryError(ctx, err) - return err - } - - repositories := repositories.NewRepositories( - pool, - gcpConfig.GoogleApplicationCredentials, - repositories.WithConvoyClientProvider( - infra.InitializeConvoyRessources(convoyConfiguration), - convoyConfiguration.RateLimit, - ), - ) - uc := usecases.NewUsecases(repositories, - usecases.WithIngestionBucketUrl(jobConfig.ingestionBucketUrl), - usecases.WithFailedWebhooksRetryPageSize(jobConfig.failedWebhooksRetryPageSize), - usecases.WithLicense(license)) - - jobs.RunScheduler(ctx, uc) - - return nil -} diff --git a/cmd/task_queue.go b/cmd/task_queue.go index 8a58bf4f9..7e56f5c68 100644 --- a/cmd/task_queue.go +++ b/cmd/task_queue.go @@ -45,25 +45,31 @@ func RunTaskQueue() error { LicenseKey: utils.GetEnv("LICENSE_KEY", ""), KillIfReadLicenseError: utils.GetEnv("KILL_IF_READ_LICENSE_ERROR", false), } - serverConfig := struct { - env string - loggingFormat string - sentryDsn string + workerConfig := struct { + appName string + env string + failedWebhooksRetryPageSize int + ingestionBucketUrl string + loggingFormat string + sentryDsn string }{ - env: utils.GetEnv("ENV", "development"), - loggingFormat: utils.GetEnv("LOGGING_FORMAT", "text"), - sentryDsn: utils.GetEnv("SENTRY_DSN", ""), + appName: "marble-backend", + env: utils.GetEnv("ENV", "development"), + failedWebhooksRetryPageSize: utils.GetEnv("FAILED_WEBHOOKS_RETRY_PAGE_SIZE", 1000), + ingestionBucketUrl: utils.GetRequiredEnv[string]("INGESTION_BUCKET_URL"), + loggingFormat: utils.GetEnv("LOGGING_FORMAT", "text"), + sentryDsn: utils.GetEnv("SENTRY_DSN", ""), } - logger := utils.NewLogger(serverConfig.loggingFormat) + logger := utils.NewLogger(workerConfig.loggingFormat) ctx := utils.StoreLoggerInContext(context.Background(), logger) license := infra.VerifyLicense(licenseConfig) - infra.SetupSentry(serverConfig.sentryDsn, serverConfig.env) + infra.SetupSentry(workerConfig.sentryDsn, workerConfig.env) defer sentry.Flush(3 * time.Second) tracingConfig := infra.TelemetryConfiguration{ - ApplicationName: "marble", + ApplicationName: workerConfig.appName, Enabled: gcpConfig.EnableTracing, ProjectID: gcpConfig.ProjectId, } @@ -97,6 +103,7 @@ func RunTaskQueue() error { ), ) + // Start the task queue workers workers := river.NewWorkers() queues, err := usecases.QueuesFromOrgs(ctx, repositories.OrganizationRepository, repositories.ExecutorGetter) if err != nil { @@ -114,6 +121,8 @@ func RunTaskQueue() error { } uc := usecases.NewUsecases(repositories, + usecases.WithIngestionBucketUrl(workerConfig.ingestionBucketUrl), + usecases.WithFailedWebhooksRetryPageSize(workerConfig.failedWebhooksRetryPageSize), usecases.WithLicense(license), ) adminUc := jobs.GenerateUsecaseWithCredForMarbleAdmin(ctx, uc) @@ -125,9 +134,16 @@ func RunTaskQueue() error { return err } + // Asynchronously keep the task queue workers up to date with the orgs in the database taskQueueWorker := uc.NewTaskQueueWorker(riverClient) go taskQueueWorker.RefreshQueuesFromOrgIds(ctx) + // Start the cron jobs using the old entrypoint. + // This will progressively be replaced by the new task queue system. + // We do not wait for it, the state of the job is handled by the task queue workers. + go jobs.RunScheduler(ctx, uc) + + // Teardown sequence sigintOrTerm := make(chan os.Signal, 1) signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM) diff --git a/jobs/scheduler.go b/jobs/scheduler.go index c7865a86c..f0adb6aea 100644 --- a/jobs/scheduler.go +++ b/jobs/scheduler.go @@ -15,6 +15,7 @@ func errToReturnCode(err error) int { return 0 } +// Deprecated and to be moved into the river task scheduler func RunScheduler(ctx context.Context, usecases usecases.Usecases) { taskr := tasker.New(tasker.Option{ Verbose: true, diff --git a/main.go b/main.go index 853d5d9a6..6c269971e 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func main() { } if *shouldRunScheduleScenarios { - // TODOl: eventually, remove this entrypoint completely + // TODO: eventually, remove this entrypoint completely logger.Info("The entrypoint \"scheduler\" is deprecated, its functionality has been merged into the \"scheduled-executer\" entrypoint") } @@ -70,7 +70,9 @@ func main() { } if *shouldRunScheduler { - err := cmd.RunJobScheduler() + // TODO: deprecated in favor of the task queue worker, which now runs the cron jobs. Will be removed eventually. + logger.Info("The entrypoint \"cron-scheduler\" is deprecated, its functionality has been merged into the \"worker\" entrypoint") + err := cmd.RunTaskQueue() if err != nil { log.Fatal(err) }