diff --git a/cmd/syntactic-code-intel-worker/shared/BUILD.bazel b/cmd/syntactic-code-intel-worker/shared/BUILD.bazel index 24e2f2d4aa43..e5b1a72370cf 100644 --- a/cmd/syntactic-code-intel-worker/shared/BUILD.bazel +++ b/cmd/syntactic-code-intel-worker/shared/BUILD.bazel @@ -1,16 +1,24 @@ +load("//dev:go_defs.bzl", "go_test") load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "shared", srcs = [ "config.go", + "indexing_worker.go", "service.go", "shared.go", + "store.go", ], importpath = "github.com/sourcegraph/sourcegraph/cmd/syntactic-code-intel-worker/shared", visibility = ["//visibility:public"], deps = [ - "//internal/codeintel/shared/lsifuploadstore", + "//internal/authz", + "//internal/conf", + "//internal/conf/conftypes", + "//internal/database/basestore", + "//internal/database/connections/live", + "//internal/database/dbutil", "//internal/debugserver", "//internal/encryption/keyring", "//internal/env", @@ -18,7 +26,28 @@ go_library( "//internal/httpserver", "//internal/observation", "//internal/service", + "//internal/workerutil", + "//internal/workerutil/dbworker", + "//internal/workerutil/dbworker/store", "//lib/errors", + "@com_github_keegancsmith_sqlf//:sqlf", "@com_github_sourcegraph_log//:log", ], ) + +go_test( + name = "shared_test", + srcs = [ + "store_helpers_test.go", + "store_test.go", + ], + embed = [":shared"], + tags = ["requires-network"], + deps = [ + "//internal/database", + "//internal/database/dbtest", + "//internal/observation", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_stretchr_testify//require", + ], +) diff --git a/cmd/syntactic-code-intel-worker/shared/config.go b/cmd/syntactic-code-intel-worker/shared/config.go index eee4ddcc16cc..f568591323d2 100644 --- a/cmd/syntactic-code-intel-worker/shared/config.go +++ b/cmd/syntactic-code-intel-worker/shared/config.go @@ -5,36 +5,39 @@ import ( "strconv" "time" - "github.com/sourcegraph/sourcegraph/internal/codeintel/shared/lsifuploadstore" "github.com/sourcegraph/sourcegraph/internal/env" "github.com/sourcegraph/sourcegraph/lib/errors" ) +type IndexingWorkerConfig struct { + env.BaseConfig + PollInterval time.Duration + Concurrency int + MaximumRuntimePerJob time.Duration + CliPath string +} + type Config struct { env.BaseConfig - WorkerPollInterval time.Duration - WorkerConcurrency int - WorkerBudget int64 - MaximumRuntimePerJob time.Duration - SCIPUploadStoreConfig *lsifuploadstore.Config - CliPath string - ListenAddress string + IndexingWorkerConfig *IndexingWorkerConfig + + ListenAddress string } const DefaultPort = 3188 -func (c *Config) Load() { - c.SCIPUploadStoreConfig = &lsifuploadstore.Config{} - c.SCIPUploadStoreConfig.Load() - - c.WorkerPollInterval = c.GetInterval("SYNTACTIC_CODE_INTEL_WORKER_POLL_INTERVAL", "1s", "Interval between queries to the repository queue") - c.WorkerConcurrency = c.GetInt("SYNTACTIC_CODE_INTEL_WORKER_CONCURRENCY", "1", "The maximum number of repositories that can be processed concurrently.") - c.WorkerBudget = int64(c.GetInt("SYNTACTIC_CODE_INTEL_WORKER_BUDGET", "0", "The amount of compressed input data (in bytes) a worker can process concurrently. Zero acts as an infinite budget.")) - c.MaximumRuntimePerJob = c.GetInterval("SYNTACTIC_CODE_INTEL_WORKER_MAXIMUM_RUNTIME_PER_JOB", "25m", "The maximum time a single repository indexing job can take") +func (c *IndexingWorkerConfig) Load() { + c.PollInterval = c.GetInterval("SYNTACTIC_CODE_INTEL_INDEXING_POLL_INTERVAL", "1s", "Interval between queries to the repository queue") + c.Concurrency = c.GetInt("SYNTACTIC_CODE_INTEL_INDEXING_CONCURRENCY", "1", "The maximum number of repositories that can be processed concurrently.") + c.MaximumRuntimePerJob = c.GetInterval("SYNTACTIC_CODE_INTEL_INDEXING_MAXIMUM_RUNTIME_PER_JOB", "5m", "The maximum time a single repository indexing job can take") c.CliPath = c.Get("SCIP_SYNTAX_PATH", "scip-syntax", "TODO: fill in description") +} +func (c *Config) Load() { + c.IndexingWorkerConfig = &IndexingWorkerConfig{} + c.IndexingWorkerConfig.Load() c.ListenAddress = c.GetOptional("SYNTACTIC_CODE_INTEL_WORKER_ADDR", "The address under which the syntactic codeintel worker API listens. Can include a port.") // Fall back to a reasonable default. if c.ListenAddress == "" { @@ -50,6 +53,6 @@ func (c *Config) Load() { func (c *Config) Validate() error { var errs error errs = errors.Append(errs, c.BaseConfig.Validate()) - errs = errors.Append(errs, c.SCIPUploadStoreConfig.Validate()) + errs = errors.Append(errs, c.IndexingWorkerConfig.Validate()) return errs } diff --git a/cmd/syntactic-code-intel-worker/shared/indexing_worker.go b/cmd/syntactic-code-intel-worker/shared/indexing_worker.go new file mode 100644 index 000000000000..721aa8d6326d --- /dev/null +++ b/cmd/syntactic-code-intel-worker/shared/indexing_worker.go @@ -0,0 +1,39 @@ +package shared + +import ( + "context" + "github.com/sourcegraph/log" + "time" + + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker" + dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" +) + +func NewIndexingWorker(ctx context.Context, observationCtx *observation.Context, workerStore dbworkerstore.Store[*SyntacticIndexingJob], config IndexingWorkerConfig) *workerutil.Worker[*SyntacticIndexingJob] { + + name := "syntactic_code_intel_indexing_worker" + + return dbworker.NewWorker[*SyntacticIndexingJob](ctx, workerStore, &indexingHandler{}, workerutil.WorkerOptions{ + Name: name, + Interval: config.PollInterval, + HeartbeatInterval: 10 * time.Second, + Metrics: workerutil.NewMetrics(observationCtx, name), + NumHandlers: config.Concurrency, + MaximumRuntimePerJob: config.MaximumRuntimePerJob, + }) + +} + +type indexingHandler struct{} + +var _ workerutil.Handler[*SyntacticIndexingJob] = &indexingHandler{} + +func (i indexingHandler) Handle(ctx context.Context, logger log.Logger, record *SyntacticIndexingJob) error { + logger.Info("Stub indexing worker handling record", + log.Int("id", record.ID), + log.String("repository name", record.RepositoryName), + log.String("commit", record.Commit)) + return nil +} diff --git a/cmd/syntactic-code-intel-worker/shared/shared.go b/cmd/syntactic-code-intel-worker/shared/shared.go index f7b5d88c0486..be2293b5293a 100644 --- a/cmd/syntactic-code-intel-worker/shared/shared.go +++ b/cmd/syntactic-code-intel-worker/shared/shared.go @@ -23,9 +23,18 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic } logger.Info("Syntactic code intel worker running", - log.String("path to scip-syntax CLI", config.CliPath), + log.String("path to scip-syntax CLI", config.IndexingWorkerConfig.CliPath), log.String("API address", config.ListenAddress)) + db := mustInitializeDB(observationCtx, "syntactic-code-intel-indexer") + + workerStore, err := NewStore(observationCtx, db) + if err != nil { + return errors.Wrap(err, "initializing worker store") + } + + indexingWorker := NewIndexingWorker(ctx, observationCtx, workerStore, *config.IndexingWorkerConfig) + // Initialize health server server := httpserver.NewFromAddr(config.ListenAddress, &http.Server{ ReadTimeout: 75 * time.Second, @@ -34,7 +43,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic }) // Go! - goroutine.MonitorBackgroundRoutines(ctx, server) + goroutine.MonitorBackgroundRoutines(ctx, server, indexingWorker) return nil } diff --git a/cmd/syntactic-code-intel-worker/shared/store.go b/cmd/syntactic-code-intel-worker/shared/store.go new file mode 100644 index 000000000000..999efaf0bdae --- /dev/null +++ b/cmd/syntactic-code-intel-worker/shared/store.go @@ -0,0 +1,159 @@ +package shared + +import ( + "database/sql" + "strconv" + "time" + + "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/log" + "github.com/sourcegraph/sourcegraph/internal/authz" + "github.com/sourcegraph/sourcegraph/internal/conf" + "github.com/sourcegraph/sourcegraph/internal/conf/conftypes" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live" + "github.com/sourcegraph/sourcegraph/internal/database/dbutil" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/workerutil" + dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store" +) + +type recordState string + +const ( + Queued recordState = "queued" + Errored recordState = "errored" + Processing recordState = "processing" + Completed recordState = "completed" +) + +// Unless marked otherwise, the columns in this +// record have a special meaning assigned to them by +// the queries dbworker performs. You can read more +// about the different fields and what they do here: +// https://sourcegraph.com/docs/dev/background-information/workers#database-backed-stores +type SyntacticIndexingJob struct { + ID int `json:"id"` + State recordState `json:"state"` + QueuedAt time.Time `json:"queuedAt"` + StartedAt *time.Time `json:"startedAt"` + FinishedAt *time.Time `json:"finishedAt"` + ProcessAfter *time.Time `json:"processAfter"` + NumResets int `json:"numResets"` + NumFailures int `json:"numFailures"` + FailureMessage *string `json:"failureMessage"` + ShouldReindex bool `json:"shouldReindex"` + + // The fields below are not part of the standard dbworker fields + + // Which commit to index + Commit string `json:"commit"` + // Which repository id to index + RepositoryID int `json:"repositoryId"` + // Name of repository being indexed + RepositoryName string `json:"repositoryName"` + // Which user scheduled this job + EnqueuerUserID int32 `json:"enqueuerUserID"` +} + +var _ workerutil.Record = SyntacticIndexingJob{} + +func (i SyntacticIndexingJob) RecordID() int { + return i.ID +} + +func (i SyntacticIndexingJob) RecordUID() string { + return strconv.Itoa(i.ID) +} + +func ScanSyntacticIndexRecord(s dbutil.Scanner) (*SyntacticIndexingJob, error) { + var job SyntacticIndexingJob + if err := scanSyntacticIndexRecord(&job, s); err != nil { + return nil, err + } + return &job, nil +} + +func scanSyntacticIndexRecord(job *SyntacticIndexingJob, s dbutil.Scanner) error { + + // Make sure this is in sync with columnExpressions below... + if err := s.Scan( + &job.ID, + &job.Commit, + &job.QueuedAt, + &job.State, + &job.FailureMessage, + &job.StartedAt, + &job.FinishedAt, + &job.ProcessAfter, + &job.NumResets, + &job.NumFailures, + &job.RepositoryID, + &job.RepositoryName, + &job.ShouldReindex, + &job.EnqueuerUserID, + ); err != nil { + return err + } + + return nil +} + +func NewStore(observationCtx *observation.Context, db *sql.DB) (dbworkerstore.Store[*SyntacticIndexingJob], error) { + + // Make sure this is in sync with the columns of the + // syntactic_scip_indexing_jobs_with_repository_name view + var columnExpressions = []*sqlf.Query{ + sqlf.Sprintf("u.id"), + sqlf.Sprintf("u.commit"), + sqlf.Sprintf("u.queued_at"), + sqlf.Sprintf("u.state"), + sqlf.Sprintf("u.failure_message"), + sqlf.Sprintf("u.started_at"), + sqlf.Sprintf("u.finished_at"), + sqlf.Sprintf("u.process_after"), + sqlf.Sprintf("u.num_resets"), + sqlf.Sprintf("u.num_failures"), + sqlf.Sprintf("u.repository_id"), + sqlf.Sprintf("u.repository_name"), + sqlf.Sprintf("u.should_reindex"), + sqlf.Sprintf("u.enqueuer_user_id"), + } + + storeOptions := dbworkerstore.Options[*SyntacticIndexingJob]{ + Name: "syntactic_scip_indexing_jobs_store", + TableName: "syntactic_scip_indexing_jobs", + ViewName: "syntactic_scip_indexing_jobs_with_repository_name u", + // Using enqueuer_user_id prioritises manually scheduled indexing + OrderByExpression: sqlf.Sprintf("(u.enqueuer_user_id > 0) DESC, u.queued_at, u.id"), + ColumnExpressions: columnExpressions, + Scan: dbworkerstore.BuildWorkerScan(ScanSyntacticIndexRecord), + } + + handle := basestore.NewHandleWithDB(observationCtx.Logger, db, sql.TxOptions{}) + return dbworkerstore.New(observationCtx, handle, storeOptions), nil +} + +func mustInitializeDB(observationCtx *observation.Context, name string) *sql.DB { + // This is an internal service, so we rely on the + // frontend to do authz checks for user requests. + // Authz checks are enforced by the DB layer + // + // This call to SetProviders is here so that calls to GetProviders don't block. + // Relevant PR: https://github.com/sourcegraph/sourcegraph/pull/15755 + // Relevant issue: https://github.com/sourcegraph/sourcegraph/issues/15962 + + authz.SetProviders(true, []authz.Provider{}) + + dsn := conf.GetServiceConnectionValueAndRestartOnChange(func(serviceConnections conftypes.ServiceConnections) string { + return serviceConnections.PostgresDSN + }) + + sqlDB, err := connections.EnsureNewFrontendDB(observationCtx, dsn, name) + + if err != nil { + log.Scoped("init db ("+name+")").Fatal("Failed to connect to frontend database", log.Error(err)) + } + + return sqlDB +} diff --git a/cmd/syntactic-code-intel-worker/shared/store_helpers_test.go b/cmd/syntactic-code-intel-worker/shared/store_helpers_test.go new file mode 100644 index 000000000000..64fbe06787d1 --- /dev/null +++ b/cmd/syntactic-code-intel-worker/shared/store_helpers_test.go @@ -0,0 +1,100 @@ +package shared + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/stretchr/testify/require" +) + +func insertIndexRecords(t testing.TB, db database.DB, records ...SyntacticIndexingJob) { + for _, index := range records { + if index.Commit == "" { + index.Commit = makeCommit(index.ID) + } + if index.State == "" { + index.State = Completed + } + if index.RepositoryID == 0 { + index.RepositoryID = 50 + } + // Ensure we have a repo for the inner join in select queries + insertRepo(t, db, index.RepositoryID, index.RepositoryName) + + query := sqlf.Sprintf(` + INSERT INTO syntactic_scip_indexing_jobs ( + id, + commit, + queued_at, + state, + failure_message, + started_at, + finished_at, + process_after, + num_resets, + num_failures, + repository_id, + should_reindex, + enqueuer_user_id + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + `, + index.ID, + index.Commit, + index.QueuedAt, + index.State, + index.FailureMessage, + index.StartedAt, + index.FinishedAt, + index.ProcessAfter, + index.NumResets, + index.NumFailures, + index.RepositoryID, + index.ShouldReindex, + index.EnqueuerUserID, + ) + + _, err := db.ExecContext(context.Background(), query.Query(sqlf.PostgresBindVar), query.Args()...) + require.NoError(t, err, "unexpected error while inserting index") + } +} + +func insertRepo(t testing.TB, db database.DB, id int, name string) { + if name == "" { + name = fmt.Sprintf("n-%d", id) + } + + deletedAt := sqlf.Sprintf("NULL") + if strings.HasPrefix(name, "DELETED-") { + deletedAt = sqlf.Sprintf("%s", "2024-02-08 15:06:50.973329+00") + } + insertRepoQuery := sqlf.Sprintf( + `INSERT INTO repo (id, name, deleted_at, private) VALUES (%s, %s, %s, %s) ON CONFLICT (id) DO NOTHING`, + id, + name, + deletedAt, + false, + ) + _, err := db.ExecContext(context.Background(), insertRepoQuery.Query(sqlf.PostgresBindVar), insertRepoQuery.Args()...) + require.NoError(t, err, "unexpected error while upserting repository") + + status := "cloned" + if strings.HasPrefix(name, "DELETED-") { + status = "not_cloned" + } + updateGitserverRepoQuery := sqlf.Sprintf( + `UPDATE gitserver_repos SET clone_status = %s WHERE repo_id = %s`, + status, + id, + ) + + _, err = db.ExecContext(context.Background(), updateGitserverRepoQuery.Query(sqlf.PostgresBindVar), updateGitserverRepoQuery.Args()...) + require.NoError(t, err, "unexpected error while upserting gitserver repository") +} + +func makeCommit(i int) string { + return fmt.Sprintf("%040d", i) +} diff --git a/cmd/syntactic-code-intel-worker/shared/store_test.go b/cmd/syntactic-code-intel-worker/shared/store_test.go new file mode 100644 index 000000000000..fa7eb11c567f --- /dev/null +++ b/cmd/syntactic-code-intel-worker/shared/store_test.go @@ -0,0 +1,103 @@ +package shared + +import ( + "context" + "testing" + "time" + + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/stretchr/testify/require" +) + +func TestIndexingWorkerStore(t *testing.T) { + /* + The purpose of this test is to verify that the DB schema + we're using for the syntactic code intel work matches + the requirements of dbworker interface, + and that we can dequeue records through this interface. + + The schema is sensitive to column names and types, and to the fact + that we are using a Postgres view to query repository name alongside + indexing records, + so it's important that we use the real Postgres in this test to prevent + schema/implementation drift. + */ + observationContext := &observation.TestContext + sqlDB := dbtest.NewDB(t) + db := database.NewDB(observationContext.Logger, sqlDB) + + store, err := NewStore(observationContext, sqlDB) + require.NoError(t, err, "unexpected error creating dbworker stores") + + ctx := context.Background() + + initCount, _ := store.QueuedCount(ctx, true) + + require.Equal(t, 0, initCount) + + insertIndexRecords(t, db, + // Even though this record is the oldest in the queue, + // it is associated with a deleted repository. + // The view that we use for dequeuing should not return this + // record at all, and the first one should still be the record with ID=1 + SyntacticIndexingJob{ + ID: 500, + Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead3333", + RepositoryID: 4, + RepositoryName: "DELETED-org/repo", + State: Queued, + QueuedAt: time.Now().Add(time.Second * -100), + }, + SyntacticIndexingJob{ + ID: 1, + Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead1111", + RepositoryID: 1, + RepositoryName: "tangy/tacos", + State: Queued, + QueuedAt: time.Now().Add(time.Second * -5), + }, + SyntacticIndexingJob{ + ID: 2, + Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead2222", + RepositoryID: 2, + RepositoryName: "salty/empanadas", + State: Queued, + QueuedAt: time.Now().Add(time.Second * -2), + }, + SyntacticIndexingJob{ + ID: 3, + Commit: "deadbeefdeadbeefdeadbeefdeadbeefdead3333", + RepositoryID: 3, + RepositoryName: "juicy/mangoes", + State: Processing, + QueuedAt: time.Now().Add(time.Second * -1), + }, + ) + + afterCount, _ := store.QueuedCount(ctx, true) + + require.Equal(t, 3, afterCount) + + record1, hasRecord, err := store.Dequeue(ctx, "worker1", nil) + + require.NoError(t, err) + require.True(t, hasRecord) + require.Equal(t, 1, record1.ID) + require.Equal(t, "tangy/tacos", record1.RepositoryName) + require.Equal(t, "deadbeefdeadbeefdeadbeefdeadbeefdead1111", record1.Commit) + + record2, hasRecord, err := store.Dequeue(ctx, "worker2", nil) + + require.NoError(t, err) + require.True(t, hasRecord) + require.Equal(t, 2, record2.ID) + require.Equal(t, "salty/empanadas", record2.RepositoryName) + require.Equal(t, "deadbeefdeadbeefdeadbeefdeadbeefdead2222", record2.Commit) + + _, hasRecord, err = store.Dequeue(ctx, "worker2", nil) + require.NoError(t, err) + require.False(t, hasRecord) + +} diff --git a/dev/scip-syntax-install.sh b/dev/scip-syntax-install.sh index 0798ff2a38f4..49fdfc0da363 100755 --- a/dev/scip-syntax-install.sh +++ b/dev/scip-syntax-install.sh @@ -26,7 +26,7 @@ trap ctrl_c INT function build_scip_syntax { cd docker-images/syntax-highlighter/crates/scip-syntax cargo build --bin scip-syntax --target-dir target - cp ./target/release/scip-syntax "$TARGET" + cp ./target/debug/scip-syntax "$TARGET" } build_scip_syntax diff --git a/internal/database/schema.json b/internal/database/schema.json index 6eec4b544ba6..b4ef2c48526e 100644 --- a/internal/database/schema.json +++ b/internal/database/schema.json @@ -1234,6 +1234,15 @@ "Increment": 1, "CycleOption": "NO" }, + { + "Name": "syntactic_scip_indexing_jobs_id_seq", + "TypeName": "bigint", + "StartValue": 1, + "MinimumValue": 1, + "MaximumValue": 9223372036854775807, + "Increment": 1, + "CycleOption": "NO" + }, { "Name": "teams_id_seq", "TypeName": "integer", @@ -26410,6 +26419,308 @@ ], "Triggers": [] }, + { + "Name": "syntactic_scip_indexing_jobs", + "Comment": "Stores metadata about a code intel syntactic index job.", + "Columns": [ + { + "Name": "cancel", + "Index": 16, + "TypeName": "boolean", + "IsNullable": false, + "Default": "false", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "commit", + "Index": 2, + "TypeName": "text", + "IsNullable": false, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "A 40-char revhash. Note that this commit may not be resolvable in the future." + }, + { + "Name": "commit_last_checked_at", + "Index": 13, + "TypeName": "timestamp with time zone", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "enqueuer_user_id", + "Index": 18, + "TypeName": "integer", + "IsNullable": false, + "Default": "0", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest" + }, + { + "Name": "execution_logs", + "Index": 12, + "TypeName": "json[]", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution." + }, + { + "Name": "failure_message", + "Index": 5, + "TypeName": "text", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "finished_at", + "Index": 7, + "TypeName": "timestamp with time zone", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "id", + "Index": 1, + "TypeName": "bigint", + "IsNullable": false, + "Default": "nextval('syntactic_scip_indexing_jobs_id_seq'::regclass)", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "last_heartbeat_at", + "Index": 15, + "TypeName": "timestamp with time zone", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "num_failures", + "Index": 11, + "TypeName": "integer", + "IsNullable": false, + "Default": "0", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "num_resets", + "Index": 10, + "TypeName": "integer", + "IsNullable": false, + "Default": "0", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "process_after", + "Index": 9, + "TypeName": "timestamp with time zone", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "queued_at", + "Index": 3, + "TypeName": "timestamp with time zone", + "IsNullable": false, + "Default": "now()", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "repository_id", + "Index": 8, + "TypeName": "integer", + "IsNullable": false, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "should_reindex", + "Index": 17, + "TypeName": "boolean", + "IsNullable": false, + "Default": "false", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "started_at", + "Index": 6, + "TypeName": "timestamp with time zone", + "IsNullable": true, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "state", + "Index": 4, + "TypeName": "text", + "IsNullable": false, + "Default": "'queued'::text", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, + { + "Name": "worker_hostname", + "Index": 14, + "TypeName": "text", + "IsNullable": false, + "Default": "''::text", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + } + ], + "Indexes": [ + { + "Name": "syntactic_scip_indexing_jobs_pkey", + "IsPrimaryKey": true, + "IsUnique": true, + "IsExclusion": false, + "IsDeferrable": false, + "IndexDefinition": "CREATE UNIQUE INDEX syntactic_scip_indexing_jobs_pkey ON syntactic_scip_indexing_jobs USING btree (id)", + "ConstraintType": "p", + "ConstraintDefinition": "PRIMARY KEY (id)" + }, + { + "Name": "syntactic_scip_indexing_jobs_dequeue_order_idx", + "IsPrimaryKey": false, + "IsUnique": false, + "IsExclusion": false, + "IsDeferrable": false, + "IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_dequeue_order_idx ON syntactic_scip_indexing_jobs USING btree ((enqueuer_user_id \u003e 0) DESC, queued_at DESC, id) WHERE state = 'queued'::text OR state = 'errored'::text", + "ConstraintType": "", + "ConstraintDefinition": "" + }, + { + "Name": "syntactic_scip_indexing_jobs_queued_at_id", + "IsPrimaryKey": false, + "IsUnique": false, + "IsExclusion": false, + "IsDeferrable": false, + "IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_queued_at_id ON syntactic_scip_indexing_jobs USING btree (queued_at DESC, id)", + "ConstraintType": "", + "ConstraintDefinition": "" + }, + { + "Name": "syntactic_scip_indexing_jobs_repository_id_commit", + "IsPrimaryKey": false, + "IsUnique": false, + "IsExclusion": false, + "IsDeferrable": false, + "IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_repository_id_commit ON syntactic_scip_indexing_jobs USING btree (repository_id, commit)", + "ConstraintType": "", + "ConstraintDefinition": "" + }, + { + "Name": "syntactic_scip_indexing_jobs_state", + "IsPrimaryKey": false, + "IsUnique": false, + "IsExclusion": false, + "IsDeferrable": false, + "IndexDefinition": "CREATE INDEX syntactic_scip_indexing_jobs_state ON syntactic_scip_indexing_jobs USING btree (state)", + "ConstraintType": "", + "ConstraintDefinition": "" + } + ], + "Constraints": [ + { + "Name": "syntactic_scip_indexing_jobs_commit_valid_chars", + "ConstraintType": "c", + "RefTableName": "", + "IsDeferrable": false, + "ConstraintDefinition": "CHECK (commit ~ '^[a-f0-9]{40}$'::text)" + } + ], + "Triggers": [] + }, { "Name": "team_members", "Comment": "", @@ -29610,6 +29921,10 @@ "Name": "site_config", "Definition": " SELECT global_state.site_id,\n global_state.initialized\n FROM global_state;" }, + { + "Name": "syntactic_scip_indexing_jobs_with_repository_name", + "Definition": " SELECT u.id,\n u.commit,\n u.queued_at,\n u.state,\n u.failure_message,\n u.started_at,\n u.finished_at,\n u.repository_id,\n u.process_after,\n u.num_resets,\n u.num_failures,\n u.execution_logs,\n u.should_reindex,\n u.enqueuer_user_id,\n r.name AS repository_name\n FROM (syntactic_scip_indexing_jobs u\n JOIN repo r ON ((r.id = u.repository_id)))\n WHERE (r.deleted_at IS NULL);" + }, { "Name": "tracking_changeset_specs_and_changesets", "Definition": " SELECT changeset_specs.id AS changeset_spec_id,\n COALESCE(changesets.id, (0)::bigint) AS changeset_id,\n changeset_specs.repo_id,\n changeset_specs.batch_spec_id,\n repo.name AS repo_name,\n COALESCE((changesets.metadata -\u003e\u003e 'Title'::text), (changesets.metadata -\u003e\u003e 'title'::text)) AS changeset_name,\n changesets.external_state,\n changesets.publication_state,\n changesets.reconciler_state,\n changesets.computed_state\n FROM ((changeset_specs\n LEFT JOIN changesets ON (((changesets.repo_id = changeset_specs.repo_id) AND (changesets.external_id = changeset_specs.external_id))))\n JOIN repo ON ((changeset_specs.repo_id = repo.id)))\n WHERE ((changeset_specs.external_id IS NOT NULL) AND (repo.deleted_at IS NULL));" diff --git a/internal/database/schema.md b/internal/database/schema.md index 71c4c3039bf0..3ff1da9aef54 100644 --- a/internal/database/schema.md +++ b/internal/database/schema.md @@ -4027,6 +4027,47 @@ Foreign-key constraints: ``` +# Table "public.syntactic_scip_indexing_jobs" +``` + Column | Type | Collation | Nullable | Default +------------------------+--------------------------+-----------+----------+---------------------------------------------------------- + id | bigint | | not null | nextval('syntactic_scip_indexing_jobs_id_seq'::regclass) + commit | text | | not null | + queued_at | timestamp with time zone | | not null | now() + state | text | | not null | 'queued'::text + failure_message | text | | | + started_at | timestamp with time zone | | | + finished_at | timestamp with time zone | | | + repository_id | integer | | not null | + process_after | timestamp with time zone | | | + num_resets | integer | | not null | 0 + num_failures | integer | | not null | 0 + execution_logs | json[] | | | + commit_last_checked_at | timestamp with time zone | | | + worker_hostname | text | | not null | ''::text + last_heartbeat_at | timestamp with time zone | | | + cancel | boolean | | not null | false + should_reindex | boolean | | not null | false + enqueuer_user_id | integer | | not null | 0 +Indexes: + "syntactic_scip_indexing_jobs_pkey" PRIMARY KEY, btree (id) + "syntactic_scip_indexing_jobs_dequeue_order_idx" btree ((enqueuer_user_id > 0) DESC, queued_at DESC, id) WHERE state = 'queued'::text OR state = 'errored'::text + "syntactic_scip_indexing_jobs_queued_at_id" btree (queued_at DESC, id) + "syntactic_scip_indexing_jobs_repository_id_commit" btree (repository_id, commit) + "syntactic_scip_indexing_jobs_state" btree (state) +Check constraints: + "syntactic_scip_indexing_jobs_commit_valid_chars" CHECK (commit ~ '^[a-f0-9]{40}$'::text) + +``` + +Stores metadata about a code intel syntactic index job. + +**commit**: A 40-char revhash. Note that this commit may not be resolvable in the future. + +**enqueuer_user_id**: ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest + +**execution_logs**: An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution. + # Table "public.team_members" ``` Column | Type | Collation | Nullable | Default @@ -4985,6 +5026,31 @@ Foreign-key constraints: FROM global_state; ``` +# View "public.syntactic_scip_indexing_jobs_with_repository_name" + +## View query: + +```sql + SELECT u.id, + u.commit, + u.queued_at, + u.state, + u.failure_message, + u.started_at, + u.finished_at, + u.repository_id, + u.process_after, + u.num_resets, + u.num_failures, + u.execution_logs, + u.should_reindex, + u.enqueuer_user_id, + r.name AS repository_name + FROM (syntactic_scip_indexing_jobs u + JOIN repo r ON ((r.id = u.repository_id))) + WHERE (r.deleted_at IS NULL); +``` + # View "public.tracking_changeset_specs_and_changesets" ## View query: diff --git a/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/down.sql b/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/down.sql new file mode 100644 index 000000000000..44d888a57ecf --- /dev/null +++ b/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/down.sql @@ -0,0 +1,4 @@ +-- Undo the changes made in the up migration + +DROP VIEW IF EXISTS syntactic_scip_indexing_jobs_with_repository_name; +DROP TABLE IF EXISTS syntactic_scip_indexing_jobs; diff --git a/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/metadata.yaml b/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/metadata.yaml new file mode 100644 index 000000000000..f30cc0aa0270 --- /dev/null +++ b/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/metadata.yaml @@ -0,0 +1,2 @@ +name: Add syntactic_scip_indexes table +parents: [1702500918] diff --git a/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/up.sql b/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/up.sql new file mode 100644 index 000000000000..544967f586dc --- /dev/null +++ b/migrations/frontend/1706710430_add_syntactic_scip_indexes_table/up.sql @@ -0,0 +1,62 @@ + +CREATE TABLE IF NOT EXISTS syntactic_scip_indexing_jobs ( + id bigserial NOT NULL PRIMARY KEY, + commit text NOT NULL, + queued_at timestamp with time zone DEFAULT now() NOT NULL, + state text DEFAULT 'queued'::text NOT NULL, + failure_message text, + started_at timestamp with time zone, + finished_at timestamp with time zone, + repository_id integer NOT NULL, + process_after timestamp with time zone, + num_resets integer DEFAULT 0 NOT NULL, + num_failures integer DEFAULT 0 NOT NULL, + execution_logs json[], + commit_last_checked_at timestamp with time zone, + worker_hostname text DEFAULT ''::text NOT NULL, + last_heartbeat_at timestamp with time zone, + cancel boolean DEFAULT false NOT NULL, + should_reindex boolean DEFAULT false NOT NULL, + enqueuer_user_id integer DEFAULT 0 NOT NULL, + CONSTRAINT syntactic_scip_indexing_jobs_commit_valid_chars CHECK ((commit ~ '^[a-f0-9]{40}$'::text)) +); + + +CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_dequeue_order_idx + ON syntactic_scip_indexing_jobs + USING btree (((enqueuer_user_id > 0)) DESC, queued_at DESC, id) + WHERE ((state = 'queued'::text) OR (state = 'errored'::text)); + +CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_queued_at_id ON syntactic_scip_indexing_jobs USING btree (queued_at DESC, id); + +CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_repository_id_commit ON syntactic_scip_indexing_jobs USING btree (repository_id, commit); + +CREATE INDEX IF NOT EXISTS syntactic_scip_indexing_jobs_state ON syntactic_scip_indexing_jobs USING btree (state); + +COMMENT ON TABLE syntactic_scip_indexing_jobs IS 'Stores metadata about a code intel syntactic index job.'; + +COMMENT ON COLUMN syntactic_scip_indexing_jobs.commit IS 'A 40-char revhash. Note that this commit may not be resolvable in the future.'; + +COMMENT ON COLUMN syntactic_scip_indexing_jobs.execution_logs IS 'An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution.'; + +COMMENT ON COLUMN syntactic_scip_indexing_jobs.enqueuer_user_id IS 'ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest'; + +CREATE OR REPLACE VIEW syntactic_scip_indexing_jobs_with_repository_name AS + SELECT u.id, + u.commit, + u.queued_at, + u.state, + u.failure_message, + u.started_at, + u.finished_at, + u.repository_id, + u.process_after, + u.num_resets, + u.num_failures, + u.execution_logs, + u.should_reindex, + u.enqueuer_user_id, + r.name AS repository_name + FROM (syntactic_scip_indexing_jobs u + JOIN repo r ON ((r.id = u.repository_id))) + WHERE (r.deleted_at IS NULL); diff --git a/migrations/frontend/squashed.sql b/migrations/frontend/squashed.sql index 350d2aad85cc..e90d685b8b2e 100644 --- a/migrations/frontend/squashed.sql +++ b/migrations/frontend/squashed.sql @@ -4646,6 +4646,65 @@ CREATE SEQUENCE survey_responses_id_seq ALTER SEQUENCE survey_responses_id_seq OWNED BY survey_responses.id; +CREATE TABLE syntactic_scip_indexing_jobs ( + id bigint NOT NULL, + commit text NOT NULL, + queued_at timestamp with time zone DEFAULT now() NOT NULL, + state text DEFAULT 'queued'::text NOT NULL, + failure_message text, + started_at timestamp with time zone, + finished_at timestamp with time zone, + repository_id integer NOT NULL, + process_after timestamp with time zone, + num_resets integer DEFAULT 0 NOT NULL, + num_failures integer DEFAULT 0 NOT NULL, + execution_logs json[], + commit_last_checked_at timestamp with time zone, + worker_hostname text DEFAULT ''::text NOT NULL, + last_heartbeat_at timestamp with time zone, + cancel boolean DEFAULT false NOT NULL, + should_reindex boolean DEFAULT false NOT NULL, + enqueuer_user_id integer DEFAULT 0 NOT NULL, + CONSTRAINT syntactic_scip_indexing_jobs_commit_valid_chars CHECK ((commit ~ '^[a-f0-9]{40}$'::text)) +); + +COMMENT ON TABLE syntactic_scip_indexing_jobs IS 'Stores metadata about a code intel syntactic index job.'; + +COMMENT ON COLUMN syntactic_scip_indexing_jobs.commit IS 'A 40-char revhash. Note that this commit may not be resolvable in the future.'; + +COMMENT ON COLUMN syntactic_scip_indexing_jobs.execution_logs IS 'An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution.'; + +COMMENT ON COLUMN syntactic_scip_indexing_jobs.enqueuer_user_id IS 'ID of the user who scheduled this index. Records with a non-NULL user ID are prioritised over the rest'; + +CREATE SEQUENCE syntactic_scip_indexing_jobs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + +ALTER SEQUENCE syntactic_scip_indexing_jobs_id_seq OWNED BY syntactic_scip_indexing_jobs.id; + +CREATE VIEW syntactic_scip_indexing_jobs_with_repository_name AS + SELECT u.id, + u.commit, + u.queued_at, + u.state, + u.failure_message, + u.started_at, + u.finished_at, + u.repository_id, + u.process_after, + u.num_resets, + u.num_failures, + u.execution_logs, + u.should_reindex, + u.enqueuer_user_id, + r.name AS repository_name + FROM (syntactic_scip_indexing_jobs u + JOIN repo r ON ((r.id = u.repository_id))) + WHERE (r.deleted_at IS NULL); + CREATE TABLE team_members ( team_id integer NOT NULL, user_id integer NOT NULL, @@ -5251,6 +5310,8 @@ ALTER TABLE ONLY settings ALTER COLUMN id SET DEFAULT nextval('settings_id_seq': ALTER TABLE ONLY survey_responses ALTER COLUMN id SET DEFAULT nextval('survey_responses_id_seq'::regclass); +ALTER TABLE ONLY syntactic_scip_indexing_jobs ALTER COLUMN id SET DEFAULT nextval('syntactic_scip_indexing_jobs_id_seq'::regclass); + ALTER TABLE ONLY teams ALTER COLUMN id SET DEFAULT nextval('teams_id_seq'::regclass); ALTER TABLE ONLY temporary_settings ALTER COLUMN id SET DEFAULT nextval('temporary_settings_id_seq'::regclass); @@ -5759,6 +5820,9 @@ ALTER TABLE ONLY settings ALTER TABLE ONLY survey_responses ADD CONSTRAINT survey_responses_pkey PRIMARY KEY (id); +ALTER TABLE ONLY syntactic_scip_indexing_jobs + ADD CONSTRAINT syntactic_scip_indexing_jobs_pkey PRIMARY KEY (id); + ALTER TABLE ONLY team_members ADD CONSTRAINT team_members_team_id_user_id_key PRIMARY KEY (team_id, user_id); @@ -6302,6 +6366,14 @@ CREATE UNIQUE INDEX sub_repo_permissions_repo_id_user_id_version_uindex ON sub_r CREATE INDEX sub_repo_perms_user_id ON sub_repo_permissions USING btree (user_id); +CREATE INDEX syntactic_scip_indexing_jobs_dequeue_order_idx ON syntactic_scip_indexing_jobs USING btree (((enqueuer_user_id > 0)) DESC, queued_at DESC, id) WHERE ((state = 'queued'::text) OR (state = 'errored'::text)); + +CREATE INDEX syntactic_scip_indexing_jobs_queued_at_id ON syntactic_scip_indexing_jobs USING btree (queued_at DESC, id); + +CREATE INDEX syntactic_scip_indexing_jobs_repository_id_commit ON syntactic_scip_indexing_jobs USING btree (repository_id, commit); + +CREATE INDEX syntactic_scip_indexing_jobs_state ON syntactic_scip_indexing_jobs USING btree (state); + CREATE UNIQUE INDEX teams_name ON teams USING btree (name); CREATE UNIQUE INDEX unique_resource_permission ON namespace_permissions USING btree (namespace, resource_id, user_id); diff --git a/sg.config.yaml b/sg.config.yaml index 684afd703988..383a63988e18 100644 --- a/sg.config.yaml +++ b/sg.config.yaml @@ -1246,8 +1246,12 @@ commandsets: - git commands: - frontend + - web - worker - blobstore + - repo-updater + - gitserver-0 + - gitserver-1 - syntactic-code-intel-worker-0 - syntactic-code-intel-worker-1