Skip to content

Commit

Permalink
"hello world" of task queue with river
Browse files Browse the repository at this point in the history
  • Loading branch information
Pascal-Delange committed Oct 18, 2024
1 parent 9997314 commit 05d5f38
Show file tree
Hide file tree
Showing 11 changed files with 335 additions and 3 deletions.
12 changes: 12 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@
"--cron-scheduler"
],
"console": "integratedTerminal"
},
{
"name": "Launch task queue workers (.env)",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/.",
"envFile": "${workspaceFolder}/.env",
"args": [
"--worker"
],
"console": "integratedTerminal"
}
]
}
13 changes: 13 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/checkmarble/marble-backend/repositories"
"github.com/checkmarble/marble-backend/usecases"
"github.com/checkmarble/marble-backend/utils"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"

"github.com/cockroachdb/errors"
"github.com/getsentry/sentry-go"
Expand Down Expand Up @@ -109,6 +111,16 @@ func RunServer() error {
utils.LogAndReportSentryError(ctx, err)
}

workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &models.SortWorker{})

riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{})
if err != nil {
utils.LogAndReportSentryError(ctx, err)
return err
}

repositories := repositories.NewRepositories(
pool,
gcpConfig.GoogleApplicationCredentials,
Expand All @@ -125,6 +137,7 @@ func RunServer() error {
usecases.WithIngestionBucketUrl(serverConfig.ingestionBucketUrl),
usecases.WithCaseManagerBucketUrl(serverConfig.caseManagerBucket),
usecases.WithLicense(license),
usecases.WithRiverClient(riverClient),
)

////////////////////////////////////////////////////////////
Expand Down
181 changes: 181 additions & 0 deletions cmd/task_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/checkmarble/marble-backend/infra"
"github.com/checkmarble/marble-backend/models"
"github.com/checkmarble/marble-backend/utils"
"github.com/cockroachdb/errors"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"

"github.com/getsentry/sentry-go"
)

func RunTaskQueue() error {
// This is where we read the environment variables and set up the configuration for the application.
gcpConfig := infra.GcpConfig{
EnableTracing: utils.GetEnv("ENABLE_GCP_TRACING", false),
ProjectId: utils.GetEnv("GOOGLE_CLOUD_PROJECT", ""),
// GoogleApplicationCredentials: utils.GetEnv("GOOGLE_APPLICATION_CREDENTIALS", ""),
}
pgConfig := infra.PgConfig{
Database: "marble",
DbConnectWithSocket: utils.GetEnv("PG_CONNECT_WITH_SOCKET", false),
Hostname: utils.GetRequiredEnv[string]("PG_HOSTNAME"),
Password: utils.GetRequiredEnv[string]("PG_PASSWORD"),
Port: utils.GetEnv("PG_PORT", "5432"),
User: utils.GetRequiredEnv[string]("PG_USER"),
}

// licenseConfig := models.LicenseConfiguration{
// LicenseKey: utils.GetEnv("LICENSE_KEY", ""),
// KillIfReadLicenseError: utils.GetEnv("KILL_IF_READ_LICENSE_ERROR", false),
// }
serverConfig := struct {
env string
loggingFormat string
sentryDsn string
}{
env: utils.GetEnv("ENV", "development"),
loggingFormat: utils.GetEnv("LOGGING_FORMAT", "text"),
sentryDsn: utils.GetEnv("SENTRY_DSN", ""),
}

logger := utils.NewLogger(serverConfig.loggingFormat)
ctx := utils.StoreLoggerInContext(context.Background(), logger)
// license := infra.VerifyLicense(licenseConfig)

infra.SetupSentry(serverConfig.sentryDsn, serverConfig.env)
defer sentry.Flush(3 * time.Second)

tracingConfig := infra.TelemetryConfiguration{
ApplicationName: "marble",
Enabled: gcpConfig.EnableTracing,
ProjectID: gcpConfig.ProjectId,
}
telemetryRessources, err := infra.InitTelemetry(tracingConfig)
if err != nil {
utils.LogAndReportSentryError(ctx, err)
}

pool, err := infra.NewPostgresConnectionPool(ctx, pgConfig.GetConnectionString(), telemetryRessources.TracerProvider)
if err != nil {
utils.LogAndReportSentryError(ctx, err)
}

workers := river.NewWorkers()
river.AddWorker(workers, &models.SortWorker{})

riverClient, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Workers: workers,
Queues: map[string]river.QueueConfig{
river.QueueDefault: {
MaxWorkers: 3,
},
},
})
if err != nil {
utils.LogAndReportSentryError(ctx, err)
return err
}

// repositories := repositories.NewRepositories(
// pool,
// gcpConfig.GoogleApplicationCredentials,
// )

// uc := usecases.NewUsecases(repositories,
// usecases.WithLicense(license),
// usecases.WithRiverClient(riverClient),
// )

if err := riverClient.Start(ctx); err != nil {
utils.LogAndReportSentryError(ctx, err)
return err
}

sigintOrTerm := make(chan os.Signal, 1)
signal.Notify(sigintOrTerm, syscall.SIGINT, syscall.SIGTERM)

// This is meant to be a realistic-looking stop goroutine that might go in a
// real program. It waits for SIGINT/SIGTERM and when received, tries to stop
// gracefully by allowing a chance for jobs to finish. But if that isn't
// working, a second SIGINT/SIGTERM will tell it to terminate with prejudice and
// it'll issue a hard stop that cancels the context of all active jobs. In
// case that doesn't work, a third SIGINT/SIGTERM ignores River's stop procedure
// completely and exits uncleanly.
go func() {
<-sigintOrTerm
fmt.Printf("Received SIGINT/SIGTERM; initiating soft stop (try to wait for jobs to finish)\n")

softStopCtx, softStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer softStopCtxCancel()

go func() {
select {
case <-sigintOrTerm:
fmt.Printf("Received SIGINT/SIGTERM again; initiating hard stop (cancel everything)\n")
softStopCtxCancel()
case <-softStopCtx.Done():
fmt.Printf("Soft stop timeout; initiating hard stop (cancel everything)\n")
}
}()

err := riverClient.Stop(softStopCtx)
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
panic(err)
}
if err == nil {
fmt.Printf("Soft stop succeeded\n")
return
}

hardStopCtx, hardStopCtxCancel := context.WithTimeout(ctx, 10*time.Second)
defer hardStopCtxCancel()

// As long as all jobs respect context cancellation, StopAndCancel will
// always work. However, in the case of a bug where a job blocks despite
// being cancelled, it may be necessary to either ignore River's stop
// result (what's shown here) or have a supervisor kill the process.
err = riverClient.StopAndCancel(hardStopCtx)
if err != nil && errors.Is(err, context.DeadlineExceeded) {
fmt.Printf("Hard stop timeout; ignoring stop procedure and exiting unsafely\n")
} else if err != nil {
panic(err)
}

// hard stop succeeded
}()

// Make sure our job starts being worked before doing anything else.
// <-jobStarted

// Cheat a little by sending a SIGTERM manually for the purpose of this
// example (normally this will be sent by user or supervisory process). The
// first SIGTERM tries a soft stop in which jobs are given a chance to
// finish up.
// sigintOrTerm <- syscall.SIGTERM

// // The soft stop will never work in this example because our job only
// // respects context cancellation, but wait a short amount of time to give it
// // a chance. After it elapses, send another SIGTERM to initiate a hard stop.
select {
case <-riverClient.Stopped():
// Will never be reached in this example because our job will only ever
// finish on context cancellation.
fmt.Printf("Soft stop succeeded\n")

case <-time.After(4 * time.Second):
sigintOrTerm <- syscall.SIGTERM
<-riverClient.Stopped()
}

return nil
}
9 changes: 8 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ require (
github.com/ory/dockertest/v3 v3.10.0
github.com/pkg/errors v0.9.1
github.com/pressly/goose/v3 v3.20.0
github.com/riverqueue/river v0.13.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0
github.com/segmentio/analytics-go/v3 v3.3.0
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.18.0
Expand All @@ -41,7 +43,7 @@ require (
golang.org/x/net v0.28.0
golang.org/x/oauth2 v0.22.0
golang.org/x/sync v0.8.0
golang.org/x/text v0.18.0
golang.org/x/text v0.19.0
golang.org/x/time v0.6.0
google.golang.org/api v0.191.0
)
Expand Down Expand Up @@ -168,6 +170,9 @@ require (
github.com/perimeterx/marshmallow v1.1.5 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/riverqueue/river/riverdriver v0.13.0 // indirect
github.com/riverqueue/river/rivershared v0.13.0 // indirect
github.com/riverqueue/river/rivertype v0.13.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sanity-io/litter v1.5.5 // indirect
github.com/segmentio/backo-go v1.1.0 // indirect
Expand All @@ -177,6 +182,7 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
Expand All @@ -191,6 +197,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
Expand Down
23 changes: 21 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@ github.com/pressly/goose/v3 v3.20.0/go.mod h1:BRfF2GcG4FTG12QfdBVy3q1yveaf4ckL9v
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/riverqueue/river v0.13.0 h1:BvEJfXAnHJ7HwraoPZWiD271t2jDVvX1SPCtvLzojiA=
github.com/riverqueue/river v0.13.0/go.mod h1:SOG+j28RQpKDsTA8AlfxjFdYpoPm+MSOio+Ev4ljN2U=
github.com/riverqueue/river/riverdriver v0.13.0 h1:UVzMtNfp3R+Ehr/yaRqgF58YOFEWGVqIAamCeK7RMkA=
github.com/riverqueue/river/riverdriver v0.13.0/go.mod h1:pxmx6qmGl+dNCrfa+xuktg8zrrZO3AEqlUFlFWOy8U4=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.13.0 h1:xiiwQVFUoPv/7PQIsEIerpw2ux1lZ14oZScgiB4JHdE=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.13.0/go.mod h1:f7TWWD965tE6v96qi1Y40IP2shsAai0qJBHbqT7yFLM=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0 h1:wjLgea/eI5rIMh0+TCjS+/+dsULIst3Wu8bZQo2DHno=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.13.0/go.mod h1:Vzt3E33kNks2vN9lTgLJL8VFrbcAWDbwzyZLo02FlBk=
github.com/riverqueue/river/rivershared v0.13.0 h1:AqRP54GgtwoLIvV5eoZmOGOCZXL8Ce5Zm8s60R8NKOA=
github.com/riverqueue/river/rivershared v0.13.0/go.mod h1:vzvawQpDy2Z1U5chkvh1NykzWNkRhc9RLcURsJRhlbE=
github.com/riverqueue/river/rivertype v0.13.0 h1:PkT3h9tP0ZV3h0EGy2MiwEhgZqpRMN4fXfj27UKc9Q0=
github.com/riverqueue/river/rivertype v0.13.0/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down Expand Up @@ -451,13 +465,16 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tailscale/depaware v0.0.0-20210622194025-720c4b409502/go.mod h1:p9lPsd+cx33L3H9nNoecRRxPssFKUwwI50I3pZ0yT+8=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
Expand Down Expand Up @@ -505,6 +522,8 @@ go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kT
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
gocloud.dev v0.39.0 h1:EYABYGhAalPUaMrbSKOr5lejxoxvXj99nE8XFtsDgds=
Expand Down Expand Up @@ -618,8 +637,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func main() {
shouldRunDataIngestion := flag.Bool("data-ingestion", false, "Run data ingestion")
shouldRunSendPendingWebhookEvents := flag.Bool("send-pending-webhook-events", false, "Send pending webhook events")
shouldRunScheduler := flag.Bool("cron-scheduler", false, "Run scheduler for cron jobs")
shouldRunWorker := flag.Bool("worker", false, "Run workers on the task queues")
flag.Parse()
logger := utils.NewLogger("text")
logger.Info("Flags",
Expand All @@ -26,6 +27,7 @@ func main() {
slog.Bool("shouldRunDataIngestion", *shouldRunDataIngestion),
slog.Bool("shouldRunScheduler", *shouldRunScheduler),
slog.Bool("shouldRunSendPendingWebhookEvents", *shouldRunSendPendingWebhookEvents),
slog.Bool("shouldRunWorker", *shouldRunWorker),
)

if *shouldRunMigrations {
Expand Down Expand Up @@ -73,4 +75,11 @@ func main() {
log.Fatal(err)
}
}

if *shouldRunWorker {
err := cmd.RunTaskQueue()
if err != nil {
log.Fatal(err)
}
}
}
28 changes: 28 additions & 0 deletions models/river_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package models

import (
"context"
"fmt"
"sort"

"github.com/riverqueue/river"
)

type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }

type SortWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
Loading

0 comments on commit 05d5f38

Please sign in to comment.