From 05d5f38045b7e407dfc63ad79ea66e3e0febee48 Mon Sep 17 00:00:00 2001 From: Pascal Delange Date: Thu, 10 Oct 2024 16:25:16 +0200 Subject: [PATCH] "hello world" of task queue with river --- .vscode/launch.json | 12 +++ cmd/server.go | 13 +++ cmd/task_queue.go | 181 ++++++++++++++++++++++++++++++++ go.mod | 9 +- go.sum | 23 +++- main.go | 9 ++ models/river_job.go | 28 +++++ repositories/migrations.go | 40 +++++++ usecases/case_usecase.go | 11 ++ usecases/usecases.go | 11 ++ usecases/usecases_with_creds.go | 1 + 11 files changed, 335 insertions(+), 3 deletions(-) create mode 100644 cmd/task_queue.go create mode 100644 models/river_job.go diff --git a/.vscode/launch.json b/.vscode/launch.json index 96295646a..9693f4174 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -64,6 +64,18 @@ "--cron-scheduler" ], "console": "integratedTerminal" + }, + { + "name": "Launch task queue workers (.env)", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/.", + "envFile": "${workspaceFolder}/.env", + "args": [ + "--worker" + ], + "console": "integratedTerminal" } ] } \ No newline at end of file diff --git a/cmd/server.go b/cmd/server.go index fe88724f0..3e7554a50 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -14,6 +14,8 @@ import ( "github.com/checkmarble/marble-backend/repositories" "github.com/checkmarble/marble-backend/usecases" "github.com/checkmarble/marble-backend/utils" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/cockroachdb/errors" "github.com/getsentry/sentry-go" @@ -109,6 +111,16 @@ func RunServer() error { utils.LogAndReportSentryError(ctx, err) } + workers := river.NewWorkers() + // AddWorker panics if the worker is already registered or invalid: + river.AddWorker(workers, &models.SortWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{}) + if err != nil { + utils.LogAndReportSentryError(ctx, err) + return err + } + repositories := repositories.NewRepositories( pool, gcpConfig.GoogleApplicationCredentials, @@ -125,6 +137,7 @@ func RunServer() error { usecases.WithIngestionBucketUrl(serverConfig.ingestionBucketUrl), usecases.WithCaseManagerBucketUrl(serverConfig.caseManagerBucket), usecases.WithLicense(license), + usecases.WithRiverClient(riverClient), ) //////////////////////////////////////////////////////////// diff --git a/cmd/task_queue.go b/cmd/task_queue.go new file mode 100644 index 000000000..5d0889533 --- /dev/null +++ b/cmd/task_queue.go @@ -0,0 +1,181 @@ +package cmd + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/checkmarble/marble-backend/infra" + "github.com/checkmarble/marble-backend/models" + "github.com/checkmarble/marble-backend/utils" + "github.com/cockroachdb/errors" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + + "github.com/getsentry/sentry-go" +) + +func RunTaskQueue() error { + // This is where we read the environment variables and set up the configuration for the application. + gcpConfig := infra.GcpConfig{ + EnableTracing: utils.GetEnv("ENABLE_GCP_TRACING", false), + ProjectId: utils.GetEnv("GOOGLE_CLOUD_PROJECT", ""), + // GoogleApplicationCredentials: utils.GetEnv("GOOGLE_APPLICATION_CREDENTIALS", ""), + } + pgConfig := infra.PgConfig{ + Database: "marble", + DbConnectWithSocket: utils.GetEnv("PG_CONNECT_WITH_SOCKET", false), + Hostname: utils.GetRequiredEnv[string]("PG_HOSTNAME"), + Password: utils.GetRequiredEnv[string]("PG_PASSWORD"), + Port: utils.GetEnv("PG_PORT", "5432"), + User: utils.GetRequiredEnv[string]("PG_USER"), + } + + // licenseConfig := models.LicenseConfiguration{ + // LicenseKey: utils.GetEnv("LICENSE_KEY", ""), + // KillIfReadLicenseError: utils.GetEnv("KILL_IF_READ_LICENSE_ERROR", false), + // } + serverConfig := struct { + env string + loggingFormat string + sentryDsn string + }{ + env: utils.GetEnv("ENV", "development"), + loggingFormat: utils.GetEnv("LOGGING_FORMAT", "text"), + sentryDsn: utils.GetEnv("SENTRY_DSN", ""), + } + + logger := utils.NewLogger(serverConfig.loggingFormat) + ctx := utils.StoreLoggerInContext(context.Background(), logger) + // license := infra.VerifyLicense(licenseConfig) + + infra.SetupSentry(serverConfig.sentryDsn, serverConfig.env) + defer sentry.Flush(3 * time.Second) + + tracingConfig := infra.TelemetryConfiguration{ + ApplicationName: "marble", + Enabled: gcpConfig.EnableTracing, + ProjectID: gcpConfig.ProjectId, + } + telemetryRessources, err := infra.InitTelemetry(tracingConfig) + if err != nil { + utils.LogAndReportSentryError(ctx, err) + } + + pool, err := infra.NewPostgresConnectionPool(ctx, pgConfig.GetConnectionString(), telemetryRessources.TracerProvider) + if err != nil { + utils.LogAndReportSentryError(ctx, err) + } + + workers := river.NewWorkers() + river.AddWorker(workers, &models.SortWorker{}) + + riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ + Workers: workers, + Queues: map[string]river.QueueConfig{ + river.QueueDefault: { + MaxWorkers: 3, + }, + }, + }) + if err != nil { + utils.LogAndReportSentryError(ctx, err) + return err + } + + // repositories := repositories.NewRepositories( + // pool, + // gcpConfig.GoogleApplicationCredentials, + // ) + + // uc := usecases.NewUsecases(repositories, + // usecases.WithLicense(license), + // usecases.WithRiverClient(riverClient), + // ) + + if err := riverClient.Start(ctx); err != nil { + utils.LogAndReportSentryError(ctx, err) + return err + } + + sigintOrTerm := make(chan os.Signal, 1) + signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM) + + // This is meant to be a realistic-looking stop goroutine that might go in a + // real program. It waits for SIGINT/SIGTERM and when received, tries to stop + // gracefully by allowing a chance for jobs to finish. But if that isn't + // working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and + // it'll issue a hard stop that cancels the context of all active jobs. In + // case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure + // completely and exits uncleanly. + go func() { + <-sigintOrTerm + fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n") + + softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer softStopCtxCancel() + + go func() { + select { + case <-sigintOrTerm: + fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n") + softStopCtxCancel() + case <-softStopCtx.Done(): + fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n") + } + }() + + err := riverClient.Stop(softStopCtx) + if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + panic(err) + } + if err == nil { + fmt.Printf("Soft stop succeeded\n") + return + } + + hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second) + defer hardStopCtxCancel() + + // As long as all jobs respect context cancellation, StopAndCancel will + // always work. However, in the case of a bug where a job blocks despite + // being cancelled, it may be necessary to either ignore River's stop + // result (what's shown here) or have a supervisor kill the process. + err = riverClient.StopAndCancel(hardStopCtx) + if err != nil && errors.Is(err, context.DeadlineExceeded) { + fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n") + } else if err != nil { + panic(err) + } + + // hard stop succeeded + }() + + // Make sure our job starts being worked before doing anything else. + // <-jobStarted + + // Cheat a little by sending a SIGTERM manually for the purpose of this + // example (normally this will be sent by user or supervisory process). The + // first SIGTERM tries a soft stop in which jobs are given a chance to + // finish up. + // sigintOrTerm <- syscall.SIGTERM + + // // The soft stop will never work in this example because our job only + // // respects context cancellation, but wait a short amount of time to give it + // // a chance. After it elapses, send another SIGTERM to initiate a hard stop. + select { + case <-riverClient.Stopped(): + // Will never be reached in this example because our job will only ever + // finish on context cancellation. + fmt.Printf("Soft stop succeeded\n") + + case <-time.After(4 * time.Second): + sigintOrTerm <- syscall.SIGTERM + <-riverClient.Stopped() + } + + return nil +} diff --git a/go.mod b/go.mod index 53c96f4fc..2934bbfe2 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,8 @@ require ( github.com/ory/dockertest/v3 v3.10.0 github.com/pkg/errors v0.9.1 github.com/pressly/goose/v3 v3.20.0 + github.com/riverqueue/river v0.13.0 + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0 github.com/segmentio/analytics-go/v3 v3.3.0 github.com/stretchr/testify v1.9.0 github.com/tidwall/gjson v1.18.0 @@ -41,7 +43,7 @@ require ( golang.org/x/net v0.28.0 golang.org/x/oauth2 v0.22.0 golang.org/x/sync v0.8.0 - golang.org/x/text v0.18.0 + golang.org/x/text v0.19.0 golang.org/x/time v0.6.0 google.golang.org/api v0.191.0 ) @@ -168,6 +170,9 @@ require ( github.com/perimeterx/marshmallow v1.1.5 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/riverqueue/river/riverdriver v0.13.0 // indirect + github.com/riverqueue/river/rivershared v0.13.0 // indirect + github.com/riverqueue/river/rivertype v0.13.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/sanity-io/litter v1.5.5 // indirect github.com/segmentio/backo-go v1.1.0 // indirect @@ -177,6 +182,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect @@ -191,6 +197,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/crypto v0.27.0 // indirect diff --git a/go.sum b/go.sum index 3e931ce06..3b923f32f 100644 --- a/go.sum +++ b/go.sum @@ -412,6 +412,20 @@ github.com/pressly/goose/v3 v3.20.0/go.mod h1:BRfF2GcG4FTG12QfdBVy3q1yveaf4ckL9v github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/riverqueue/river v0.13.0 h1:BvEJfXAnHJ7HwraoPZWiD271t2jDVvX1SPCtvLzojiA= +github.com/riverqueue/river v0.13.0/go.mod h1:SOG+j28RQpKDsTA8AlfxjFdYpoPm+MSOio+Ev4ljN2U= +github.com/riverqueue/river/riverdriver v0.13.0 h1:UVzMtNfp3R+Ehr/yaRqgF58YOFEWGVqIAamCeK7RMkA= +github.com/riverqueue/river/riverdriver v0.13.0/go.mod h1:pxmx6qmGl+dNCrfa+xuktg8zrrZO3AEqlUFlFWOy8U4= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.13.0 h1:xiiwQVFUoPv/7PQIsEIerpw2ux1lZ14oZScgiB4JHdE= +github.com/riverqueue/river/riverdriver/riverdatabasesql v0.13.0/go.mod h1:f7TWWD965tE6v96qi1Y40IP2shsAai0qJBHbqT7yFLM= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0 h1:wjLgea/eI5rIMh0+TCjS+/+dsULIst3Wu8bZQo2DHno= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0/go.mod h1:Vzt3E33kNks2vN9lTgLJL8VFrbcAWDbwzyZLo02FlBk= +github.com/riverqueue/river/rivershared v0.13.0 h1:AqRP54GgtwoLIvV5eoZmOGOCZXL8Ce5Zm8s60R8NKOA= +github.com/riverqueue/river/rivershared v0.13.0/go.mod h1:vzvawQpDy2Z1U5chkvh1NykzWNkRhc9RLcURsJRhlbE= +github.com/riverqueue/river/rivertype v0.13.0 h1:PkT3h9tP0ZV3h0EGy2MiwEhgZqpRMN4fXfj27UKc9Q0= +github.com/riverqueue/river/rivertype v0.13.0/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= @@ -451,6 +465,7 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tailscale/depaware v0.0.0-20210622194025-720c4b409502/go.mod h1:p9lPsd+cx33L3H9nNoecRRxPssFKUwwI50I3pZ0yT+8= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -458,6 +473,8 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= @@ -505,6 +522,8 @@ go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kT go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= gocloud.dev v0.39.0 h1:EYABYGhAalPUaMrbSKOr5lejxoxvXj99nE8XFtsDgds= @@ -618,8 +637,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/main.go b/main.go index 841e080b1..853d5d9a6 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ func main() { shouldRunDataIngestion := flag.Bool("data-ingestion", false, "Run data ingestion") shouldRunSendPendingWebhookEvents := flag.Bool("send-pending-webhook-events", false, "Send pending webhook events") shouldRunScheduler := flag.Bool("cron-scheduler", false, "Run scheduler for cron jobs") + shouldRunWorker := flag.Bool("worker", false, "Run workers on the task queues") flag.Parse() logger := utils.NewLogger("text") logger.Info("Flags", @@ -26,6 +27,7 @@ func main() { slog.Bool("shouldRunDataIngestion", *shouldRunDataIngestion), slog.Bool("shouldRunScheduler", *shouldRunScheduler), slog.Bool("shouldRunSendPendingWebhookEvents", *shouldRunSendPendingWebhookEvents), + slog.Bool("shouldRunWorker", *shouldRunWorker), ) if *shouldRunMigrations { @@ -73,4 +75,11 @@ func main() { log.Fatal(err) } } + + if *shouldRunWorker { + err := cmd.RunTaskQueue() + if err != nil { + log.Fatal(err) + } + } } diff --git a/models/river_job.go b/models/river_job.go new file mode 100644 index 000000000..1befa00ef --- /dev/null +++ b/models/river_job.go @@ -0,0 +1,28 @@ +package models + +import ( + "context" + "fmt" + "sort" + + "github.com/riverqueue/river" +) + +type SortArgs struct { + // Strings is a slice of strings to sort. + Strings []string `json:"strings"` +} + +func (SortArgs) Kind() string { return "sort" } + +type SortWorker struct { + // An embedded WorkerDefaults sets up default methods to fulfill the rest of + // the Worker interface: + river.WorkerDefaults[SortArgs] +} + +func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { + sort.Strings(job.Args.Strings) + fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) + return nil +} diff --git a/repositories/migrations.go b/repositories/migrations.go index b45ba953c..9d8666433 100644 --- a/repositories/migrations.go +++ b/repositories/migrations.go @@ -12,6 +12,10 @@ import ( "github.com/checkmarble/marble-backend/utils" "github.com/cockroachdb/errors" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivermigrate" + + "github.com/jackc/pgx/v5/pgxpool" _ "github.com/jackc/pgx/v5/stdlib" "github.com/pressly/goose/v3" ) @@ -55,6 +59,21 @@ func (m *Migrater) Run(ctx context.Context) error { return errors.Wrap(err, "migrateAnalyticsViews error") } + pgxPool, err := m.openDbPgx(ctx) + if err != nil { + return errors.Wrap(err, "unable to open db in Migrater") + } + migrator, err := rivermigrate.New(riverpgxv5.New(pgxPool), nil) + if err != nil { + return errors.Wrap(err, "unable to create migrator") + } + + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) + if err != nil { + return errors.Wrap(err, "unable to run migrations") + } + fmt.Println(res.Versions) + return nil } @@ -74,6 +93,27 @@ func (m *Migrater) openDb(ctx context.Context) error { return nil } +func (m *Migrater) openDbPgx(ctx context.Context) (*pgxpool.Pool, error) { + connectionString := m.pgConfig.GetConnectionString() + cfg, err := pgxpool.ParseConfig(connectionString) + if err != nil { + return nil, fmt.Errorf("create connection pool: %w", err) + } + + pool, err := pgxpool.NewWithConfig(context.Background(), cfg) + if err != nil { + return nil, fmt.Errorf("unable to create connection pool: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err = pool.Ping(ctx); err != nil { + return nil, fmt.Errorf("unable to ping database: %w", err) + } + + return pool, nil +} + func (m *Migrater) runMarbleDbMigrations(ctx context.Context) error { logger := utils.LoggerFromContext(ctx) logger.InfoContext(ctx, "Migrations starting to setup marble DB") diff --git a/usecases/case_usecase.go b/usecases/case_usecase.go index d162458de..9a90ee6df 100644 --- a/usecases/case_usecase.go +++ b/usecases/case_usecase.go @@ -19,6 +19,8 @@ import ( "github.com/checkmarble/marble-backend/utils" "github.com/cockroachdb/errors" "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" ) type CaseUseCaseRepository interface { @@ -69,6 +71,7 @@ type CaseUseCase struct { transactionFactory executor_factory.TransactionFactory executorFactory executor_factory.ExecutorFactory webhookEventsUsecase webhookEventsUsecase + riverClient *river.Client[pgx.Tx] } func (usecase *CaseUseCase) ListCases( @@ -94,6 +97,14 @@ func (usecase *CaseUseCase) ListCases( ctx, usecase.transactionFactory, func(tx repositories.Transaction) ([]models.CaseWithRank, error) { + res, err := usecase.riverClient.InsertTx(ctx, tx.RawTx(), models.SortArgs{ + Strings: []string{"a", "b", "c"}, + }, nil) + if err != nil { + return []models.CaseWithRank{}, err + } + fmt.Println(res.Job.ID) + availableInboxIds, err := usecase.getAvailableInboxIds(ctx, tx, organizationId) if err != nil { return []models.CaseWithRank{}, err diff --git a/usecases/usecases.go b/usecases/usecases.go index 7f8bb35fb..1ff7bf334 100644 --- a/usecases/usecases.go +++ b/usecases/usecases.go @@ -11,6 +11,8 @@ import ( "github.com/checkmarble/marble-backend/usecases/scenarios" "github.com/checkmarble/marble-backend/usecases/scheduled_execution" "github.com/checkmarble/marble-backend/usecases/security" + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" ) type Usecases struct { @@ -20,6 +22,7 @@ type Usecases struct { caseManagerBucketUrl string failedWebhooksRetryPageSize int license models.LicenseValidation + riverClient *river.Client[pgx.Tx] } type Option func(*options) @@ -54,12 +57,19 @@ func WithBatchIngestionMaxSize(size int) Option { } } +func WithRiverClient(client *river.Client[pgx.Tx]) Option { + return func(o *options) { + o.riverClient = client + } +} + type options struct { batchIngestionMaxSize int ingestionBucketUrl string caseManagerBucketUrl string failedWebhooksRetryPageSize int license models.LicenseValidation + riverClient *river.Client[pgx.Tx] } func newUsecasesWithOptions(repositories repositories.Repositories, o *options) Usecases { @@ -73,6 +83,7 @@ func newUsecasesWithOptions(repositories repositories.Repositories, o *options) caseManagerBucketUrl: o.caseManagerBucketUrl, failedWebhooksRetryPageSize: o.failedWebhooksRetryPageSize, license: o.license, + riverClient: o.riverClient, } } diff --git a/usecases/usecases_with_creds.go b/usecases/usecases_with_creds.go index 3145fe9eb..8c92822f3 100644 --- a/usecases/usecases_with_creds.go +++ b/usecases/usecases_with_creds.go @@ -262,6 +262,7 @@ func (usecases *UsecasesWithCreds) NewCaseUseCase() *CaseUseCase { caseManagerBucketUrl: usecases.caseManagerBucketUrl, blobRepository: usecases.Repositories.BlobRepository, webhookEventsUsecase: usecases.NewWebhookEventsUsecase(), + riverClient: usecases.riverClient, } }