Skip to content

Commit

Permalink
more cron scheduler into the same entrypoint "worker" as the task que…
Browse files Browse the repository at this point in the history
…ue worker
  • Loading branch information
Pascal-Delange committed Oct 18, 2024
1 parent e31a031 commit 6d87108
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 109 deletions.
97 changes: 0 additions & 97 deletions cmd/run_job_scheduler.go

This file was deleted.

36 changes: 26 additions & 10 deletions cmd/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,31 @@ func RunTaskQueue() error {
LicenseKey: utils.GetEnv("LICENSE_KEY", ""),
KillIfReadLicenseError: utils.GetEnv("KILL_IF_READ_LICENSE_ERROR", false),
}
serverConfig := struct {
env string
loggingFormat string
sentryDsn string
workerConfig := struct {
appName string
env string
failedWebhooksRetryPageSize int
ingestionBucketUrl string
loggingFormat string
sentryDsn string
}{
env: utils.GetEnv("ENV", "development"),
loggingFormat: utils.GetEnv("LOGGING_FORMAT", "text"),
sentryDsn: utils.GetEnv("SENTRY_DSN", ""),
appName: "marble-backend",
env: utils.GetEnv("ENV", "development"),
failedWebhooksRetryPageSize: utils.GetEnv("FAILED_WEBHOOKS_RETRY_PAGE_SIZE", 1000),
ingestionBucketUrl: utils.GetRequiredEnv[string]("INGESTION_BUCKET_URL"),
loggingFormat: utils.GetEnv("LOGGING_FORMAT", "text"),
sentryDsn: utils.GetEnv("SENTRY_DSN", ""),
}

logger := utils.NewLogger(serverConfig.loggingFormat)
logger := utils.NewLogger(workerConfig.loggingFormat)
ctx := utils.StoreLoggerInContext(context.Background(), logger)
license := infra.VerifyLicense(licenseConfig)

infra.SetupSentry(serverConfig.sentryDsn, serverConfig.env)
infra.SetupSentry(workerConfig.sentryDsn, workerConfig.env)
defer sentry.Flush(3 * time.Second)

tracingConfig := infra.TelemetryConfiguration{
ApplicationName: "marble",
ApplicationName: workerConfig.appName,
Enabled: gcpConfig.EnableTracing,
ProjectID: gcpConfig.ProjectId,
}
Expand Down Expand Up @@ -97,6 +103,7 @@ func RunTaskQueue() error {
),
)

// Start the task queue workers
workers := river.NewWorkers()
queues, err := usecases.QueuesFromOrgs(ctx, repositories.OrganizationRepository, repositories.ExecutorGetter)
if err != nil {
Expand All @@ -114,6 +121,8 @@ func RunTaskQueue() error {
}

uc := usecases.NewUsecases(repositories,
usecases.WithIngestionBucketUrl(workerConfig.ingestionBucketUrl),
usecases.WithFailedWebhooksRetryPageSize(workerConfig.failedWebhooksRetryPageSize),
usecases.WithLicense(license),
)
adminUc := jobs.GenerateUsecaseWithCredForMarbleAdmin(ctx, uc)
Expand All @@ -125,9 +134,16 @@ func RunTaskQueue() error {
return err
}

// Asynchronously keep the task queue workers up to date with the orgs in the database
taskQueueWorker := uc.NewTaskQueueWorker(riverClient)
go taskQueueWorker.RefreshQueuesFromOrgIds(ctx)

// Start the cron jobs using the old entrypoint.
// This will progressively be replaced by the new task queue system.
// We do not wait for it, the state of the job is handled by the task queue workers.
go jobs.RunScheduler(ctx, uc)

// Teardown sequence
sigintOrTerm := make(chan os.Signal, 1)
signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM)

Expand Down
1 change: 1 addition & 0 deletions jobs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func errToReturnCode(err error) int {
return 0
}

// Deprecated and to be moved into the river task scheduler
func RunScheduler(ctx context.Context, usecases usecases.Usecases) {
taskr := tasker.New(tasker.Option{
Verbose: true,
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
}

if *shouldRunScheduleScenarios {
// TODOl: eventually, remove this entrypoint completely
// TODO: eventually, remove this entrypoint completely
logger.Info("The entrypoint \"scheduler\" is deprecated, its functionality has been merged into the \"scheduled-executer\" entrypoint")
}

Expand All @@ -70,7 +70,9 @@ func main() {
}

if *shouldRunScheduler {
err := cmd.RunJobScheduler()
// TODO: deprecated in favor of the task queue worker, which now runs the cron jobs. Will be removed eventually.
logger.Info("The entrypoint \"cron-scheduler\" is deprecated, its functionality has been merged into the \"worker\" entrypoint")
err := cmd.RunTaskQueue()
if err != nil {
log.Fatal(err)
}
Expand Down

0 comments on commit 6d87108

Please sign in to comment.