Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.

Syntactic indexing: add database schema and dbworker #60055

Merged
merged 48 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
107a75b
Move database code out of service entrypoint
keynmol Feb 1, 2024
1e8cdcd
Worker scaffold
keynmol Feb 2, 2024
8c0cc75
Name database connections
keynmol Feb 2, 2024
147a8be
Add auto-incrementing sequence to the table
keynmol Feb 5, 2024
0a87259
Add more services to the configuration
keynmol Feb 5, 2024
69618c1
Merge branch 'main' into syntactic-worker-database
keynmol Feb 5, 2024
b991ac0
Correct broken merge and incorrect path to scip-syntax CLI
keynmol Feb 5, 2024
4e17d9b
Add basic tests for the dbworker interface
keynmol Feb 6, 2024
293ed01
Merge branch 'main' into syntactic-worker-database
keynmol Feb 6, 2024
2fbb05a
Remove spurious check ins
keynmol Feb 6, 2024
2616056
Clean up down migration
keynmol Feb 6, 2024
530f30e
Clean up migration file
keynmol Feb 6, 2024
43bb346
Run Gazelle
keynmol Feb 6, 2024
6768573
Remove things I don't understand
keynmol Feb 6, 2024
140ac04
Revert unrelated changes
keynmol Feb 6, 2024
30d35d6
Restore code I don't fully understand
keynmol Feb 6, 2024
26ead64
Amend comment to provide more detail
keynmol Feb 6, 2024
94af832
Add requires network to tests
keynmol Feb 6, 2024
9ef22fa
Regenerate squashed migration and description files
keynmol Feb 6, 2024
edde8a8
Make migration idempotent
keynmol Feb 6, 2024
20961f4
Merge branch 'main' into syntactic-worker-database
keynmol Feb 6, 2024
a5a496a
Fix migration idempotency
keynmol Feb 6, 2024
af0874e
Use explicit env variables in config loading
keynmol Feb 6, 2024
0fa10fb
Worker naming
keynmol Feb 6, 2024
fb73eb5
Remove outfile column and field
keynmol Feb 6, 2024
8219c88
Fail fast assertions
keynmol Feb 6, 2024
45aea83
compile-time interface conformance for syntactic index record
keynmol Feb 6, 2024
b9bad2a
Surface error from store initialisation
keynmol Feb 6, 2024
72f95e3
Regenerate migration summary files
keynmol Feb 6, 2024
05a7426
Use an enum for recordState
keynmol Feb 6, 2024
eb85a1f
Rename indexing worker config
keynmol Feb 6, 2024
d484a71
Fix tests
keynmol Feb 6, 2024
43cb6a0
Rename table and view names for consistency
keynmol Feb 7, 2024
35d9fe6
gofmt
keynmol Feb 7, 2024
d5eefae
Merge branch 'main' into syntactic-worker-database
keynmol Feb 7, 2024
5c4d2e2
Clean up error handling in tests
keynmol Feb 8, 2024
3fa7ea6
Add indexes to the table
keynmol Feb 8, 2024
48d2295
Simplify call to SetProviders
keynmol Feb 8, 2024
295bc95
Use Completed constant
keynmol Feb 8, 2024
8c81264
Make migration idempotent
keynmol Feb 8, 2024
bac2a1f
Run gazelle
keynmol Feb 8, 2024
512b133
Add a test for a case where there are no records to dequeue
keynmol Feb 8, 2024
daab1f3
Rearrange fields for clarity
keynmol Feb 8, 2024
8b315b9
Add test for deleted repository
keynmol Feb 8, 2024
82eb0f0
Make timestamp more readable
keynmol Feb 8, 2024
81b7de4
Use bigserial for primary key
keynmol Feb 8, 2024
928315a
Write squashed schema
keynmol Feb 8, 2024
cf1a70f
Merge branch 'main' into syntactic-worker-database
keynmol Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion cmd/syntactic-code-intel-worker/shared/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
load("//dev:go_defs.bzl", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "shared",
srcs = [
"config.go",
"indexing_worker.go",
"service.go",
"shared.go",
"store.go",
],
importpath = "github.com/sourcegraph/sourcegraph/cmd/syntactic-code-intel-worker/shared",
visibility = ["//visibility:public"],
deps = [
"//internal/codeintel/shared/lsifuploadstore",
"//internal/authz",
"//internal/authz/providers",
"//internal/conf",
"//internal/conf/conftypes",
"//internal/database",
"//internal/database/basestore",
"//internal/database/connections/live",
"//internal/database/dbutil",
"//internal/debugserver",
"//internal/encryption/keyring",
"//internal/env",
"//internal/goroutine",
"//internal/httpserver",
"//internal/observation",
"//internal/service",
"//internal/workerutil",
"//internal/workerutil/dbworker",
"//internal/workerutil/dbworker/store",
"//lib/errors",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_sourcegraph_log//:log",
],
)

go_test(
name = "shared_test",
srcs = [
"store_helpers_test.go",
"store_test.go",
],
embed = [":shared"],
tags = ["requires-network"],
deps = [
"//internal/database",
"//internal/database/dbtest",
"//internal/observation",
"@com_github_keegancsmith_sqlf//:sqlf",
"@com_github_stretchr_testify//assert",
],
)
39 changes: 23 additions & 16 deletions cmd/syntactic-code-intel-worker/shared/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,43 @@ import (
"strconv"
"time"

"github.com/sourcegraph/sourcegraph/internal/codeintel/shared/lsifuploadstore"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/lib/errors"
)

type IndexingWorkerConfig struct {
env.BaseConfig
PollInterval time.Duration
Concurrency int
MaximumRuntimePerJob time.Duration
CliPath string
}

type Config struct {
env.BaseConfig

WorkerPollInterval time.Duration
WorkerConcurrency int
WorkerBudget int64
MaximumRuntimePerJob time.Duration
SCIPUploadStoreConfig *lsifuploadstore.Config
CliPath string
ListenAddress string
IndexingWorker *IndexingWorkerConfig

ListenAddress string
}

const DefaultPort = 3188

func (c *Config) Load() {
c.SCIPUploadStoreConfig = &lsifuploadstore.Config{}
c.SCIPUploadStoreConfig.Load()
func scopedEnv(name string) string {
return "SYNTACTIC_CODE_INTEL_" + name
}

c.WorkerPollInterval = c.GetInterval("SYNTACTIC_CODE_INTEL_WORKER_POLL_INTERVAL", "1s", "Interval between queries to the repository queue")
c.WorkerConcurrency = c.GetInt("SYNTACTIC_CODE_INTEL_WORKER_CONCURRENCY", "1", "The maximum number of repositories that can be processed concurrently.")
c.WorkerBudget = int64(c.GetInt("SYNTACTIC_CODE_INTEL_WORKER_BUDGET", "0", "The amount of compressed input data (in bytes) a worker can process concurrently. Zero acts as an infinite budget."))
c.MaximumRuntimePerJob = c.GetInterval("SYNTACTIC_CODE_INTEL_WORKER_MAXIMUM_RUNTIME_PER_JOB", "25m", "The maximum time a single repository indexing job can take")
func (c *IndexingWorkerConfig) Load() {

c.PollInterval = c.GetInterval(scopedEnv("INDEXING_POLL_INTERVAL"), "1s", "Interval between queries to the repository queue")
c.Concurrency = c.GetInt(scopedEnv("INDEXING_CONCURRENCY"), "1", "The maximum number of repositories that can be processed concurrently.")
c.MaximumRuntimePerJob = c.GetInterval(scopedEnv("INDEXING_MAXIMUM_RUNTIME_PER_JOB"), "5m", "The maximum time a single repository indexing job can take")
c.CliPath = c.Get("SCIP_SYNTAX_PATH", "scip-syntax", "TODO: fill in description")
}

func (c *Config) Load() {
c.IndexingWorker = &IndexingWorkerConfig{}
c.IndexingWorker.Load()
c.ListenAddress = c.GetOptional("SYNTACTIC_CODE_INTEL_WORKER_ADDR", "The address under which the syntactic codeintel worker API listens. Can include a port.")
// Fall back to a reasonable default.
if c.ListenAddress == "" {
Expand All @@ -50,6 +57,6 @@ func (c *Config) Load() {
func (c *Config) Validate() error {
var errs error
errs = errors.Append(errs, c.BaseConfig.Validate())
errs = errors.Append(errs, c.SCIPUploadStoreConfig.Validate())
errs = errors.Append(errs, c.IndexingWorker.Validate())
return errs
}
39 changes: 39 additions & 0 deletions cmd/syntactic-code-intel-worker/shared/indexing_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package shared

import (
"context"
"github.com/sourcegraph/log"
"time"

"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)

func NewIndexingWorker(ctx context.Context, observationCtx *observation.Context, workerStore dbworkerstore.Store[*SyntacticIndexRecord], config IndexingWorkerConfig) *workerutil.Worker[*SyntacticIndexRecord] {

name := "syntactic_code_intel_indexer"

return dbworker.NewWorker[*SyntacticIndexRecord](ctx, workerStore, &indexingHandler{}, workerutil.WorkerOptions{
Name: name,
Interval: config.PollInterval,
HeartbeatInterval: 10 * time.Second,
Metrics: workerutil.NewMetrics(observationCtx, name),
NumHandlers: config.Concurrency,
MaximumRuntimePerJob: config.MaximumRuntimePerJob,
})

}

type indexingHandler struct{}

var _ workerutil.Handler[*SyntacticIndexRecord] = &indexingHandler{}

func (i indexingHandler) Handle(ctx context.Context, logger log.Logger, record *SyntacticIndexRecord) error {
logger.Info("Stub indexing worker handling record",
log.Int("id", record.ID),
log.String("repository name", record.RepositoryName),
log.String("commit", record.Commit))
return nil
}
8 changes: 6 additions & 2 deletions cmd/syntactic-code-intel-worker/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
}

logger.Info("Syntactic code intel worker running",
log.String("path to scip-syntax CLI", config.CliPath),
log.String("path to scip-syntax CLI", config.IndexingWorker.CliPath),
log.String("API address", config.ListenAddress))

db := mustInitializeDB(observationCtx, "syntactic-code-intel-indexer")
workerStore, _ := NewStore(observationCtx, db)
indexingWorker := NewIndexingWorker(ctx, observationCtx, workerStore, *config.IndexingWorker)

// Initialize health server
server := httpserver.NewFromAddr(config.ListenAddress, &http.Server{
ReadTimeout: 75 * time.Second,
Expand All @@ -34,7 +38,7 @@ func Main(ctx context.Context, observationCtx *observation.Context, ready servic
})

// Go!
goroutine.MonitorBackgroundRoutines(ctx, server)
goroutine.MonitorBackgroundRoutines(ctx, server, indexingWorker)

return nil
}
145 changes: 145 additions & 0 deletions cmd/syntactic-code-intel-worker/shared/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package shared

import (
"context"
"database/sql"
"strconv"
"time"

"github.com/keegancsmith/sqlf"
"github.com/sourcegraph/log"
"github.com/sourcegraph/sourcegraph/internal/authz"
"github.com/sourcegraph/sourcegraph/internal/authz/providers"
"github.com/sourcegraph/sourcegraph/internal/conf"
"github.com/sourcegraph/sourcegraph/internal/conf/conftypes"
"github.com/sourcegraph/sourcegraph/internal/database"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
connections "github.com/sourcegraph/sourcegraph/internal/database/connections/live"
"github.com/sourcegraph/sourcegraph/internal/database/dbutil"
"github.com/sourcegraph/sourcegraph/internal/observation"
dbworkerstore "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)

type SyntacticIndexRecord struct {
ID int `json:"id"`
Commit string `json:"commit"`
QueuedAt time.Time `json:"queuedAt"`
State string `json:"state"`
FailureMessage *string `json:"failureMessage"`
StartedAt *time.Time `json:"startedAt"`
FinishedAt *time.Time `json:"finishedAt"`
ProcessAfter *time.Time `json:"processAfter"`
NumResets int `json:"numResets"`
NumFailures int `json:"numFailures"`
RepositoryID int `json:"repositoryId"`
RepositoryName string `json:"repositoryName"`
Outfile string `json:"outfile"`
ShouldReindex bool `json:"shouldReindex"`
EnqueuerUserID int32 `json:"enqueuerUserID"`
}

func (i SyntacticIndexRecord) RecordID() int {
return i.ID
}

func (i SyntacticIndexRecord) RecordUID() string {
return strconv.Itoa(i.ID)
}

func ScanSyntacticIndexRecord(s dbutil.Scanner) (*SyntacticIndexRecord, error) {
var job SyntacticIndexRecord
if err := scanSyntacticIndexRecord(&job, s); err != nil {
return nil, err
}
return &job, nil
}

func scanSyntacticIndexRecord(job *SyntacticIndexRecord, s dbutil.Scanner) error {

// Make sure this is in sync with columnExpressions below...
if err := s.Scan(
&job.ID,
&job.Commit,
&job.QueuedAt,
&job.State,
&job.FailureMessage,
&job.StartedAt,
&job.FinishedAt,
&job.ProcessAfter,
&job.NumResets,
&job.NumFailures,
&job.RepositoryID,
&job.RepositoryName,
&job.Outfile,
&job.ShouldReindex,
&job.EnqueuerUserID,
); err != nil {
return err
}

return nil
}

func NewStore(observationCtx *observation.Context, db *sql.DB) (dbworkerstore.Store[*SyntacticIndexRecord], error) {

// Make sure this is in sync with the columns of the
// syntactic_scip_indexes_with_repository_name view
var columnExpressions = []*sqlf.Query{
sqlf.Sprintf("u.id"),
sqlf.Sprintf("u.commit"),
sqlf.Sprintf("u.queued_at"),
sqlf.Sprintf("u.state"),
sqlf.Sprintf("u.failure_message"),
sqlf.Sprintf("u.started_at"),
sqlf.Sprintf("u.finished_at"),
sqlf.Sprintf("u.process_after"),
sqlf.Sprintf("u.num_resets"),
sqlf.Sprintf("u.num_failures"),
sqlf.Sprintf("u.repository_id"),
sqlf.Sprintf("u.repository_name"),
sqlf.Sprintf("u.outfile"),
sqlf.Sprintf("u.should_reindex"),
sqlf.Sprintf("u.enqueuer_user_id"),
}

storeOptions := dbworkerstore.Options[*SyntacticIndexRecord]{
Name: "syntactic_scip_index_store",
TableName: "syntactic_scip_indexes",
ViewName: "syntactic_scip_indexes_with_repository_name u",
// Using enqueuer_user_id prioritises manually scheduled indexing
OrderByExpression: sqlf.Sprintf("(u.enqueuer_user_id > 0) DESC, u.queued_at, u.id"),
ColumnExpressions: columnExpressions,
Scan: dbworkerstore.BuildWorkerScan(ScanSyntacticIndexRecord),
}

handle := basestore.NewHandleWithDB(observationCtx.Logger, db, sql.TxOptions{})
return dbworkerstore.New(observationCtx, handle, storeOptions), nil
}

func mustInitializeDB(observationCtx *observation.Context, name string) *sql.DB {
dsn := conf.GetServiceConnectionValueAndRestartOnChange(func(serviceConnections conftypes.ServiceConnections) string {
return serviceConnections.PostgresDSN
})
sqlDB, err := connections.EnsureNewFrontendDB(observationCtx, dsn, name)
if err != nil {
log.Scoped("init db ("+name+")").Fatal("Failed to connect to frontend database", log.Error(err))
}

// This is an internal service, so we rely on the
// frontend to do authz checks for user requests.
// Authz checks are enforced by the DB layer
//
// This call to SetProviders is here so that calls to GetProviders don't block.
// Relevant PR: https://github.com/sourcegraph/sourcegraph/pull/15755
// Relevant issue: https://github.com/sourcegraph/sourcegraph/issues/15962

ctx := context.Background()
db := database.NewDB(observationCtx.Logger, sqlDB)
go func() {
for range time.NewTicker(providers.RefreshInterval()).C {
allowAccessByDefault, authzProviders, _, _, _ := providers.ProvidersFromConfig(ctx, conf.Get(), db)
authz.SetProviders(allowAccessByDefault, authzProviders)
}
}()
return sqlDB
}
Loading