diff --git a/cmd/worker/internal/codeintel/autoindexing_scheduler.go b/cmd/worker/internal/codeintel/autoindexing_scheduler.go index 5e6153595416..d8e410965fb7 100644 --- a/cmd/worker/internal/codeintel/autoindexing_scheduler.go +++ b/cmd/worker/internal/codeintel/autoindexing_scheduler.go @@ -52,7 +52,8 @@ func (j *autoindexingScheduler) Routines(_ context.Context, observationCtx *obse services.UploadsService, services.PoliciesService, matcher, - services.AutoIndexingService, + services.PreciseRepoSchedulingService, + *services.AutoIndexingService, db.Repos(), ), nil } diff --git a/internal/codeintel/BUILD.bazel b/internal/codeintel/BUILD.bazel index 0891153ce886..6f5e99331243 100644 --- a/internal/codeintel/BUILD.bazel +++ b/internal/codeintel/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//internal/codeintel/dependencies", "//internal/codeintel/policies", "//internal/codeintel/ranking", + "//internal/codeintel/reposcheduler", "//internal/codeintel/sentinel", "//internal/codeintel/shared", "//internal/codeintel/uploads", diff --git a/internal/codeintel/autoindexing/BUILD.bazel b/internal/codeintel/autoindexing/BUILD.bazel index ea8d34f9f29e..0a1adc348498 100644 --- a/internal/codeintel/autoindexing/BUILD.bazel +++ b/internal/codeintel/autoindexing/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//internal/codeintel/autoindexing/internal/store", "//internal/codeintel/autoindexing/shared", "//internal/codeintel/dependencies", + "//internal/codeintel/reposcheduler", "//internal/codeintel/uploads/shared", "//internal/database", "//internal/gitserver", diff --git a/internal/codeintel/autoindexing/init.go b/internal/codeintel/autoindexing/init.go index e0a8b574cab4..d4e00f7532a4 100644 --- a/internal/codeintel/autoindexing/init.go +++ b/internal/codeintel/autoindexing/init.go @@ -7,6 +7,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/background/summary" "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/inference" autoindexingstore "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/store" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/gitserver" "github.com/sourcegraph/sourcegraph/internal/goroutine" @@ -49,14 +50,15 @@ func NewIndexSchedulers( uploadSvc UploadService, policiesSvc PoliciesService, policyMatcher PolicyMatcher, - autoindexingSvc *Service, + repoSchedulingSvc reposcheduler.RepositorySchedulingService, + autoindexingSvc Service, repoStore database.RepoStore, ) []goroutine.BackgroundRoutine { return background.NewIndexSchedulers( scopedContext("scheduler", observationCtx), policiesSvc, policyMatcher, - autoindexingSvc, + repoSchedulingSvc, autoindexingSvc.indexEnqueuer, repoStore, autoindexingSvc.store, diff --git a/internal/codeintel/autoindexing/internal/background/BUILD.bazel b/internal/codeintel/autoindexing/internal/background/BUILD.bazel index 7f8a61201f3d..a5a78a85da31 100644 --- a/internal/codeintel/autoindexing/internal/background/BUILD.bazel +++ b/internal/codeintel/autoindexing/internal/background/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//internal/codeintel/autoindexing/internal/background/summary", "//internal/codeintel/autoindexing/internal/jobselector", "//internal/codeintel/autoindexing/internal/store", + "//internal/codeintel/reposcheduler", "//internal/database", "//internal/goroutine", "//internal/observation", diff --git a/internal/codeintel/autoindexing/internal/background/dependencies/mocks_test.go b/internal/codeintel/autoindexing/internal/background/dependencies/mocks_test.go index fcba4bc0a5f1..e1fbe0ada993 100644 --- a/internal/codeintel/autoindexing/internal/background/dependencies/mocks_test.go +++ b/internal/codeintel/autoindexing/internal/background/dependencies/mocks_test.go @@ -1932,10 +1932,6 @@ type MockStore struct { // GetQueuedRepoRevFunc is an instance of a mock function object // controlling the behavior of the method GetQueuedRepoRev. GetQueuedRepoRevFunc *StoreGetQueuedRepoRevFunc - // GetRepositoriesForIndexScanFunc is an instance of a mock function - // object controlling the behavior of the method - // GetRepositoriesForIndexScan. - GetRepositoriesForIndexScanFunc *StoreGetRepositoriesForIndexScanFunc // InsertDependencyIndexingJobFunc is an instance of a mock function // object controlling the behavior of the method // InsertDependencyIndexingJob. @@ -2012,11 +2008,6 @@ func NewMockStore() *MockStore { return }, }, - GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{ - defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) (r0 []int, r1 error) { - return - }, - }, InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{ defaultHook: func(context.Context, int, string, time.Time) (r0 int, r1 error) { return @@ -2119,11 +2110,6 @@ func NewStrictMockStore() *MockStore { panic("unexpected invocation of MockStore.GetQueuedRepoRev") }, }, - GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{ - defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - panic("unexpected invocation of MockStore.GetRepositoriesForIndexScan") - }, - }, InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{ defaultHook: func(context.Context, int, string, time.Time) (int, error) { panic("unexpected invocation of MockStore.InsertDependencyIndexingJob") @@ -2218,9 +2204,6 @@ func NewMockStoreFrom(i store.Store) *MockStore { GetQueuedRepoRevFunc: &StoreGetQueuedRepoRevFunc{ defaultHook: i.GetQueuedRepoRev, }, - GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{ - defaultHook: i.GetRepositoriesForIndexScan, - }, InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{ defaultHook: i.InsertDependencyIndexingJob, }, @@ -2709,129 +2692,6 @@ func (c StoreGetQueuedRepoRevFuncCall) Results() []interface{} { return []interface{}{c.Result0, c.Result1} } -// StoreGetRepositoriesForIndexScanFunc describes the behavior when the -// GetRepositoriesForIndexScan method of the parent MockStore instance is -// invoked. -type StoreGetRepositoriesForIndexScanFunc struct { - defaultHook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) - hooks []func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) - history []StoreGetRepositoriesForIndexScanFuncCall - mutex sync.Mutex -} - -// GetRepositoriesForIndexScan delegates to the next hook function in the -// queue and stores the parameter and result values of this invocation. -func (m *MockStore) GetRepositoriesForIndexScan(v0 context.Context, v1 time.Duration, v2 bool, v3 *int, v4 int, v5 time.Time) ([]int, error) { - r0, r1 := m.GetRepositoriesForIndexScanFunc.nextHook()(v0, v1, v2, v3, v4, v5) - m.GetRepositoriesForIndexScanFunc.appendCall(StoreGetRepositoriesForIndexScanFuncCall{v0, v1, v2, v3, v4, v5, r0, r1}) - return r0, r1 -} - -// SetDefaultHook sets function that is called when the -// GetRepositoriesForIndexScan method of the parent MockStore instance is -// invoked and the hook queue is empty. -func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) { - f.defaultHook = hook -} - -// PushHook adds a function to the end of hook queue. Each invocation of the -// GetRepositoriesForIndexScan method of the parent MockStore instance -// invokes the hook at the front of the queue and discards it. After the -// queue is empty, the default hook function is invoked for any future -// action. -func (f *StoreGetRepositoriesForIndexScanFunc) PushHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) { - f.mutex.Lock() - f.hooks = append(f.hooks, hook) - f.mutex.Unlock() -} - -// SetDefaultReturn calls SetDefaultHook with a function that returns the -// given values. -func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultReturn(r0 []int, r1 error) { - f.SetDefaultHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - return r0, r1 - }) -} - -// PushReturn calls PushHook with a function that returns the given values. -func (f *StoreGetRepositoriesForIndexScanFunc) PushReturn(r0 []int, r1 error) { - f.PushHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - return r0, r1 - }) -} - -func (f *StoreGetRepositoriesForIndexScanFunc) nextHook() func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - f.mutex.Lock() - defer f.mutex.Unlock() - - if len(f.hooks) == 0 { - return f.defaultHook - } - - hook := f.hooks[0] - f.hooks = f.hooks[1:] - return hook -} - -func (f *StoreGetRepositoriesForIndexScanFunc) appendCall(r0 StoreGetRepositoriesForIndexScanFuncCall) { - f.mutex.Lock() - f.history = append(f.history, r0) - f.mutex.Unlock() -} - -// History returns a sequence of StoreGetRepositoriesForIndexScanFuncCall -// objects describing the invocations of this function. -func (f *StoreGetRepositoriesForIndexScanFunc) History() []StoreGetRepositoriesForIndexScanFuncCall { - f.mutex.Lock() - history := make([]StoreGetRepositoriesForIndexScanFuncCall, len(f.history)) - copy(history, f.history) - f.mutex.Unlock() - - return history -} - -// StoreGetRepositoriesForIndexScanFuncCall is an object that describes an -// invocation of method GetRepositoriesForIndexScan on an instance of -// MockStore. -type StoreGetRepositoriesForIndexScanFuncCall struct { - // Arg0 is the value of the 1st argument passed to this method - // invocation. - Arg0 context.Context - // Arg1 is the value of the 2nd argument passed to this method - // invocation. - Arg1 time.Duration - // Arg2 is the value of the 3rd argument passed to this method - // invocation. - Arg2 bool - // Arg3 is the value of the 4th argument passed to this method - // invocation. - Arg3 *int - // Arg4 is the value of the 5th argument passed to this method - // invocation. - Arg4 int - // Arg5 is the value of the 6th argument passed to this method - // invocation. - Arg5 time.Time - // Result0 is the value of the 1st result returned from this method - // invocation. - Result0 []int - // Result1 is the value of the 2nd result returned from this method - // invocation. - Result1 error -} - -// Args returns an interface slice containing the arguments of this -// invocation. -func (c StoreGetRepositoriesForIndexScanFuncCall) Args() []interface{} { - return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4, c.Arg5} -} - -// Results returns an interface slice containing the results of this -// invocation. -func (c StoreGetRepositoriesForIndexScanFuncCall) Results() []interface{} { - return []interface{}{c.Result0, c.Result1} -} - // StoreInsertDependencyIndexingJobFunc describes the behavior when the // InsertDependencyIndexingJob method of the parent MockStore instance is // invoked. diff --git a/internal/codeintel/autoindexing/internal/background/init.go b/internal/codeintel/autoindexing/internal/background/init.go index 9b8df059dd6c..a20a1a6e7ef0 100644 --- a/internal/codeintel/autoindexing/internal/background/init.go +++ b/internal/codeintel/autoindexing/internal/background/init.go @@ -6,6 +6,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/background/summary" "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/jobselector" "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/store" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/goroutine" "github.com/sourcegraph/sourcegraph/internal/observation" @@ -23,7 +24,7 @@ func NewIndexSchedulers( observationCtx *observation.Context, policiesSvc scheduler.PoliciesService, policyMatcher scheduler.PolicyMatcher, - autoindexingSvc scheduler.AutoIndexingService, + repoSchedulingSvc reposcheduler.RepositorySchedulingService, indexEnqueuer scheduler.IndexEnqueuer, repoStore database.RepoStore, store store.Store, @@ -32,7 +33,7 @@ func NewIndexSchedulers( return []goroutine.BackgroundRoutine{ scheduler.NewScheduler( observationCtx, - autoindexingSvc, + repoSchedulingSvc, policiesSvc, policyMatcher, indexEnqueuer, diff --git a/internal/codeintel/autoindexing/internal/background/scheduler/BUILD.bazel b/internal/codeintel/autoindexing/internal/background/scheduler/BUILD.bazel index 9459437f4675..a4fc633dedba 100644 --- a/internal/codeintel/autoindexing/internal/background/scheduler/BUILD.bazel +++ b/internal/codeintel/autoindexing/internal/background/scheduler/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//internal/codeintel/dependencies", "//internal/codeintel/policies", "//internal/codeintel/policies/shared", + "//internal/codeintel/reposcheduler", "//internal/codeintel/uploads/shared", "//internal/conf", "//internal/database", diff --git a/internal/codeintel/autoindexing/internal/background/scheduler/iface.go b/internal/codeintel/autoindexing/internal/background/scheduler/iface.go index 5c16a7c1e5e6..1df2bd380078 100644 --- a/internal/codeintel/autoindexing/internal/background/scheduler/iface.go +++ b/internal/codeintel/autoindexing/internal/background/scheduler/iface.go @@ -23,7 +23,3 @@ type IndexEnqueuer interface { QueueIndexes(ctx context.Context, repositoryID int, rev, configuration string, force, bypassLimit bool) (_ []uploadsshared.Index, err error) QueueIndexesForPackage(ctx context.Context, pkg dependencies.MinimialVersionedPackageRepo) (err error) } - -type AutoIndexingService interface { - GetRepositoriesForIndexScan(ctx context.Context, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) (_ []int, err error) -} diff --git a/internal/codeintel/autoindexing/internal/background/scheduler/job_scheduler.go b/internal/codeintel/autoindexing/internal/background/scheduler/job_scheduler.go index 6657181bee06..a2e73d13661f 100644 --- a/internal/codeintel/autoindexing/internal/background/scheduler/job_scheduler.go +++ b/internal/codeintel/autoindexing/internal/background/scheduler/job_scheduler.go @@ -12,6 +12,7 @@ import ( "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/inference" "github.com/sourcegraph/sourcegraph/internal/codeintel/autoindexing/internal/store" policiesshared "github.com/sourcegraph/sourcegraph/internal/codeintel/policies/shared" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" "github.com/sourcegraph/sourcegraph/internal/conf" "github.com/sourcegraph/sourcegraph/internal/database" "github.com/sourcegraph/sourcegraph/internal/gitserver/gitdomain" @@ -23,18 +24,18 @@ import ( ) type indexSchedulerJob struct { - autoindexingSvc AutoIndexingService - policiesSvc PoliciesService - policyMatcher PolicyMatcher - indexEnqueuer IndexEnqueuer - repoStore database.RepoStore + repoSchedulingSvc reposcheduler.RepositorySchedulingService + policiesSvc PoliciesService + policyMatcher PolicyMatcher + indexEnqueuer IndexEnqueuer + repoStore database.RepoStore } var m = new(metrics.SingletonREDMetrics) func NewScheduler( observationCtx *observation.Context, - autoindexingSvc AutoIndexingService, + repoSchedulingSvc reposcheduler.RepositorySchedulingService, policiesSvc PoliciesService, policyMatcher PolicyMatcher, indexEnqueuer IndexEnqueuer, @@ -42,11 +43,11 @@ func NewScheduler( config *Config, ) goroutine.BackgroundRoutine { job := indexSchedulerJob{ - autoindexingSvc: autoindexingSvc, - policiesSvc: policiesSvc, - policyMatcher: policyMatcher, - indexEnqueuer: indexEnqueuer, - repoStore: repoStore, + repoSchedulingSvc: repoSchedulingSvc, + policiesSvc: policiesSvc, + policyMatcher: policyMatcher, + indexEnqueuer: indexEnqueuer, + repoStore: repoStore, } redMetrics := m.Get(func() *metrics.REDMetrics { @@ -103,12 +104,13 @@ func (b indexSchedulerJob) handleScheduler( // set should contain repositories that have yet to be updated, or that have been updated least recently. // This allows us to update every repository reliably, even if it takes a long time to process through // the backlog. - repositories, err := b.autoindexingSvc.GetRepositoriesForIndexScan( + repositories, err := b.repoSchedulingSvc.GetRepositoriesForIndexScan( ctx, - repositoryProcessDelay, - conf.CodeIntelAutoIndexingAllowGlobalPolicies(), - repositoryMatchLimit, - repositoryBatchSize, + reposcheduler.NewBatchOptions( + repositoryProcessDelay, + conf.CodeIntelAutoIndexingAllowGlobalPolicies(), + repositoryMatchLimit, + repositoryBatchSize), time.Now(), ) if err != nil { @@ -128,13 +130,13 @@ func (b indexSchedulerJob) handleScheduler( errMu sync.Mutex ) - for _, repositoryID := range repositories { + for _, repository := range repositories { if err := sema.Acquire(ctx, 1); err != nil { return err } go func() { defer sema.Release(1) - if repositoryErr := b.handleRepository(ctx, repositoryID, policyBatchSize, now); repositoryErr != nil { + if repositoryErr := b.handleRepository(ctx, repository.ID, policyBatchSize, now); repositoryErr != nil { if !errors.As(err, &inference.LimitError{}) { errMu.Lock() errs = errors.Append(errs, repositoryErr) diff --git a/internal/codeintel/autoindexing/internal/store/BUILD.bazel b/internal/codeintel/autoindexing/internal/store/BUILD.bazel index 68f77b87517a..14f63ed4a3b3 100644 --- a/internal/codeintel/autoindexing/internal/store/BUILD.bazel +++ b/internal/codeintel/autoindexing/internal/store/BUILD.bazel @@ -45,8 +45,8 @@ go_test( "coverage_test.go", "dependencies_test.go", "enqueuer_test.go", - "scheduler_test.go", "store_helpers_test.go", + "store_test.go", ], embed = [":store"], tags = [ @@ -61,7 +61,6 @@ go_test( "//internal/database/dbtest", "//internal/executor", "//internal/observation", - "//internal/timeutil", "@com_github_google_go_cmp//cmp", "@com_github_keegancsmith_sqlf//:sqlf", "@com_github_lib_pq//:pq", diff --git a/internal/codeintel/autoindexing/internal/store/scheduler_test.go b/internal/codeintel/autoindexing/internal/store/scheduler_test.go deleted file mode 100644 index f5295db0342d..000000000000 --- a/internal/codeintel/autoindexing/internal/store/scheduler_test.go +++ /dev/null @@ -1,260 +0,0 @@ -package store - -import ( - "context" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/keegancsmith/sqlf" - "github.com/sourcegraph/log/logtest" - - "github.com/sourcegraph/sourcegraph/internal/database" - "github.com/sourcegraph/sourcegraph/internal/database/dbtest" - "github.com/sourcegraph/sourcegraph/internal/observation" - "github.com/sourcegraph/sourcegraph/internal/timeutil" -) - -func TestSelectRepositoriesForIndexScan(t *testing.T) { - logger := logtest.Scoped(t) - db := database.NewDB(logger, dbtest.NewDB(t)) - store := testStoreWithoutConfigurationPolicies(t, db) - - now := timeutil.Now() - insertRepo(t, db, 50, "r0") - insertRepo(t, db, 51, "r1") - insertRepo(t, db, 52, "r2") - insertRepo(t, db, 53, "r3") - updateGitserverUpdatedAt(t, db, now) - - query := ` - INSERT INTO lsif_configuration_policies ( - id, - repository_id, - name, - type, - pattern, - repository_patterns, - retention_enabled, - retention_duration_hours, - retain_intermediate_commits, - indexing_enabled, - index_commit_max_age_hours, - index_intermediate_commits - ) VALUES - (101, 50, 'policy 1', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, 0, false), - (102, 51, 'policy 2', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, 0, false), - (103, 52, 'policy 3', 'GIT_TREE', 'ef/', null, true, 0, false, true, 0, false), - (104, 53, 'policy 4', 'GIT_TREE', 'gh/', null, true, 0, false, true, 0, false), - (105, 54, 'policy 5', 'GIT_TREE', 'gh/', null, true, 0, false, false, 0, false) - ` - if _, err := db.ExecContext(context.Background(), query); err != nil { - t.Fatalf("unexpected error while inserting configuration policies: %s", err) - } - - // Can return null last_index_scan - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 2, now); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // 20 minutes later, first two repositories are still on cooldown - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // 30 minutes later, all repositories are still on cooldown - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int(nil), repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // 90 minutes later, all repositories are visible - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // Make new invisible repository - insertRepo(t, db, 54, "r4") - - // 95 minutes later, new repository is not yet visible - if repositoryIDs, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*95)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int(nil), repositoryIDs); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - query = `UPDATE lsif_configuration_policies SET indexing_enabled = true WHERE id = 105` - if _, err := db.ExecContext(context.Background(), query); err != nil { - t.Fatalf("unexpected error while inserting configuration policies: %s", err) - } - - // 100 minutes later, only new repository is visible - if repositoryIDs, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*100)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{54}, repositoryIDs); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // 110 minutes later, nothing is ready to go (too close to last index scan) - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*110)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int(nil), repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // Update repo 50 (GIT_COMMIT/HEAD policy), and 51 (GIT_TREE policy) - gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s WHERE repo_id IN (50, 52)`, now.Add(time.Minute*105)) - if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil { - t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err) - } - - // 110 minutes later, updated repositories are ready for re-indexing - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*110)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{50}, repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } -} - -func TestSelectRepositoriesForIndexScanWithGlobalPolicy(t *testing.T) { - logger := logtest.Scoped(t) - db := database.NewDB(logger, dbtest.NewDB(t)) - store := testStoreWithoutConfigurationPolicies(t, db) - - now := timeutil.Now() - insertRepo(t, db, 50, "r0") - insertRepo(t, db, 51, "r1") - insertRepo(t, db, 52, "r2") - insertRepo(t, db, 53, "r3") - updateGitserverUpdatedAt(t, db, now) - - query := ` - INSERT INTO lsif_configuration_policies ( - id, - repository_id, - name, - type, - pattern, - repository_patterns, - retention_enabled, - retention_duration_hours, - retain_intermediate_commits, - indexing_enabled, - index_commit_max_age_hours, - index_intermediate_commits - ) VALUES - (101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, 0, false) - ` - if _, err := db.ExecContext(context.Background(), query); err != nil { - t.Fatalf("unexpected error while inserting configuration policies: %s", err) - } - - // Returns nothing when disabled - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, false, nil, 100, now); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int(nil), repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // Returns at most configured limit - limit := 2 - - // Can return null last_index_scan - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, &limit, 100, now); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{50, 51}, repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // 20 minutes later, first two repositories are still on cooldown - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*20)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{52, 53}, repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // 30 minutes later, all repositories are still on cooldown - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*30)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int(nil), repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } - - // 90 minutes later, all repositories are visible - if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), time.Hour, true, nil, 100, now.Add(time.Minute*90)); err != nil { - t.Fatalf("unexpected error fetching repositories for index scan: %s", err) - } else if diff := cmp.Diff([]int{50, 51, 52, 53}, repositories); diff != "" { - t.Fatalf("unexpected repository list (-want +got):\n%s", diff) - } -} - -func TestMarkRepoRevsAsProcessed(t *testing.T) { - ctx := context.Background() - logger := logtest.Scoped(t) - db := database.NewDB(logger, dbtest.NewDB(t)) - store := New(&observation.TestContext, db) - - expected := []RepoRev{ - {1, 50, "HEAD"}, - {2, 50, "HEAD~1"}, - {3, 50, "HEAD~2"}, - {4, 51, "HEAD"}, - {5, 51, "HEAD~1"}, - {6, 51, "HEAD~2"}, - {7, 52, "HEAD"}, - {8, 52, "HEAD~1"}, - {9, 52, "HEAD~2"}, - } - for _, repoRev := range expected { - if err := store.QueueRepoRev(ctx, repoRev.RepositoryID, repoRev.Rev); err != nil { - t.Fatalf("unexpected error: %s", err) - } - } - - // entire set - repoRevs, err := store.GetQueuedRepoRev(ctx, 50) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - if diff := cmp.Diff(expected, repoRevs); diff != "" { - t.Errorf("unexpected repo revs (-want +got):\n%s", diff) - } - - // mark first elements as complete; re-request remaining - if err := store.MarkRepoRevsAsProcessed(ctx, []int{1, 2, 3, 4, 5}); err != nil { - t.Fatalf("unexpected error: %s", err) - } - repoRevs, err = store.GetQueuedRepoRev(ctx, 50) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - if diff := cmp.Diff(expected[5:], repoRevs); diff != "" { - t.Errorf("unexpected repo revs (-want +got):\n%s", diff) - } -} - -// -// - -// removes default configuration policies -func testStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) Store { - if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil { - t.Fatalf("unexpected error while inserting configuration policies: %s", err) - } - - return New(&observation.TestContext, db) -} - -func updateGitserverUpdatedAt(t *testing.T, db database.DB, now time.Time) { - gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s`, now.Add(-time.Hour*24)) - if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil { - t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err) - } -} diff --git a/internal/codeintel/autoindexing/internal/store/store.go b/internal/codeintel/autoindexing/internal/store/store.go index 789da8966c8d..4659c7d6640d 100644 --- a/internal/codeintel/autoindexing/internal/store/store.go +++ b/internal/codeintel/autoindexing/internal/store/store.go @@ -34,7 +34,6 @@ type Store interface { TruncateConfigurationSummary(ctx context.Context, numRecordsToRetain int) error // Scheduler - GetRepositoriesForIndexScan(ctx context.Context, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) ([]int, error) GetQueuedRepoRev(ctx context.Context, batchSize int) ([]RepoRev, error) MarkRepoRevsAsProcessed(ctx context.Context, ids []int) error diff --git a/internal/codeintel/autoindexing/internal/store/store_test.go b/internal/codeintel/autoindexing/internal/store/store_test.go new file mode 100644 index 000000000000..f78f26a4b7f6 --- /dev/null +++ b/internal/codeintel/autoindexing/internal/store/store_test.go @@ -0,0 +1,76 @@ +package store + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/log/logtest" + + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +func TestMarkRepoRevsAsProcessed(t *testing.T) { + ctx := context.Background() + logger := logtest.Scoped(t) + db := database.NewDB(logger, dbtest.NewDB(t)) + store := New(&observation.TestContext, db) + + expected := []RepoRev{ + {1, 50, "HEAD"}, + {2, 50, "HEAD~1"}, + {3, 50, "HEAD~2"}, + {4, 51, "HEAD"}, + {5, 51, "HEAD~1"}, + {6, 51, "HEAD~2"}, + {7, 52, "HEAD"}, + {8, 52, "HEAD~1"}, + {9, 52, "HEAD~2"}, + } + for _, repoRev := range expected { + if err := store.QueueRepoRev(ctx, repoRev.RepositoryID, repoRev.Rev); err != nil { + t.Fatalf("unexpected error: %s", err) + } + } + + // entire set + repoRevs, err := store.GetQueuedRepoRev(ctx, 50) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if diff := cmp.Diff(expected, repoRevs); diff != "" { + t.Errorf("unexpected repo revs (-want +got):\n%s", diff) + } + + // mark first elements as complete; re-request remaining + if err := store.MarkRepoRevsAsProcessed(ctx, []int{1, 2, 3, 4, 5}); err != nil { + t.Fatalf("unexpected error: %s", err) + } + repoRevs, err = store.GetQueuedRepoRev(ctx, 50) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if diff := cmp.Diff(expected[5:], repoRevs); diff != "" { + t.Errorf("unexpected repo revs (-want +got):\n%s", diff) + } +} + +// removes default configuration policies +func testStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) Store { + if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil { + t.Fatalf("unexpected error while inserting configuration policies: %s", err) + } + + return New(&observation.TestContext, db) +} + +func updateGitserverUpdatedAt(t *testing.T, db database.DB, now time.Time) { + gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s`, now.Add(-time.Hour*24)) + if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil { + t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err) + } +} diff --git a/internal/codeintel/autoindexing/mocks_test.go b/internal/codeintel/autoindexing/mocks_test.go index 630eba3ce47e..cec1aef447ac 100644 --- a/internal/codeintel/autoindexing/mocks_test.go +++ b/internal/codeintel/autoindexing/mocks_test.go @@ -37,10 +37,6 @@ type MockStore struct { // GetQueuedRepoRevFunc is an instance of a mock function object // controlling the behavior of the method GetQueuedRepoRev. GetQueuedRepoRevFunc *StoreGetQueuedRepoRevFunc - // GetRepositoriesForIndexScanFunc is an instance of a mock function - // object controlling the behavior of the method - // GetRepositoriesForIndexScan. - GetRepositoriesForIndexScanFunc *StoreGetRepositoriesForIndexScanFunc // InsertDependencyIndexingJobFunc is an instance of a mock function // object controlling the behavior of the method // InsertDependencyIndexingJob. @@ -117,11 +113,6 @@ func NewMockStore() *MockStore { return }, }, - GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{ - defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) (r0 []int, r1 error) { - return - }, - }, InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{ defaultHook: func(context.Context, int, string, time.Time) (r0 int, r1 error) { return @@ -224,11 +215,6 @@ func NewStrictMockStore() *MockStore { panic("unexpected invocation of MockStore.GetQueuedRepoRev") }, }, - GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{ - defaultHook: func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - panic("unexpected invocation of MockStore.GetRepositoriesForIndexScan") - }, - }, InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{ defaultHook: func(context.Context, int, string, time.Time) (int, error) { panic("unexpected invocation of MockStore.InsertDependencyIndexingJob") @@ -323,9 +309,6 @@ func NewMockStoreFrom(i store.Store) *MockStore { GetQueuedRepoRevFunc: &StoreGetQueuedRepoRevFunc{ defaultHook: i.GetQueuedRepoRev, }, - GetRepositoriesForIndexScanFunc: &StoreGetRepositoriesForIndexScanFunc{ - defaultHook: i.GetRepositoriesForIndexScan, - }, InsertDependencyIndexingJobFunc: &StoreInsertDependencyIndexingJobFunc{ defaultHook: i.InsertDependencyIndexingJob, }, @@ -814,129 +797,6 @@ func (c StoreGetQueuedRepoRevFuncCall) Results() []interface{} { return []interface{}{c.Result0, c.Result1} } -// StoreGetRepositoriesForIndexScanFunc describes the behavior when the -// GetRepositoriesForIndexScan method of the parent MockStore instance is -// invoked. -type StoreGetRepositoriesForIndexScanFunc struct { - defaultHook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) - hooks []func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) - history []StoreGetRepositoriesForIndexScanFuncCall - mutex sync.Mutex -} - -// GetRepositoriesForIndexScan delegates to the next hook function in the -// queue and stores the parameter and result values of this invocation. -func (m *MockStore) GetRepositoriesForIndexScan(v0 context.Context, v1 time.Duration, v2 bool, v3 *int, v4 int, v5 time.Time) ([]int, error) { - r0, r1 := m.GetRepositoriesForIndexScanFunc.nextHook()(v0, v1, v2, v3, v4, v5) - m.GetRepositoriesForIndexScanFunc.appendCall(StoreGetRepositoriesForIndexScanFuncCall{v0, v1, v2, v3, v4, v5, r0, r1}) - return r0, r1 -} - -// SetDefaultHook sets function that is called when the -// GetRepositoriesForIndexScan method of the parent MockStore instance is -// invoked and the hook queue is empty. -func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) { - f.defaultHook = hook -} - -// PushHook adds a function to the end of hook queue. Each invocation of the -// GetRepositoriesForIndexScan method of the parent MockStore instance -// invokes the hook at the front of the queue and discards it. After the -// queue is empty, the default hook function is invoked for any future -// action. -func (f *StoreGetRepositoriesForIndexScanFunc) PushHook(hook func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error)) { - f.mutex.Lock() - f.hooks = append(f.hooks, hook) - f.mutex.Unlock() -} - -// SetDefaultReturn calls SetDefaultHook with a function that returns the -// given values. -func (f *StoreGetRepositoriesForIndexScanFunc) SetDefaultReturn(r0 []int, r1 error) { - f.SetDefaultHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - return r0, r1 - }) -} - -// PushReturn calls PushHook with a function that returns the given values. -func (f *StoreGetRepositoriesForIndexScanFunc) PushReturn(r0 []int, r1 error) { - f.PushHook(func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - return r0, r1 - }) -} - -func (f *StoreGetRepositoriesForIndexScanFunc) nextHook() func(context.Context, time.Duration, bool, *int, int, time.Time) ([]int, error) { - f.mutex.Lock() - defer f.mutex.Unlock() - - if len(f.hooks) == 0 { - return f.defaultHook - } - - hook := f.hooks[0] - f.hooks = f.hooks[1:] - return hook -} - -func (f *StoreGetRepositoriesForIndexScanFunc) appendCall(r0 StoreGetRepositoriesForIndexScanFuncCall) { - f.mutex.Lock() - f.history = append(f.history, r0) - f.mutex.Unlock() -} - -// History returns a sequence of StoreGetRepositoriesForIndexScanFuncCall -// objects describing the invocations of this function. -func (f *StoreGetRepositoriesForIndexScanFunc) History() []StoreGetRepositoriesForIndexScanFuncCall { - f.mutex.Lock() - history := make([]StoreGetRepositoriesForIndexScanFuncCall, len(f.history)) - copy(history, f.history) - f.mutex.Unlock() - - return history -} - -// StoreGetRepositoriesForIndexScanFuncCall is an object that describes an -// invocation of method GetRepositoriesForIndexScan on an instance of -// MockStore. -type StoreGetRepositoriesForIndexScanFuncCall struct { - // Arg0 is the value of the 1st argument passed to this method - // invocation. - Arg0 context.Context - // Arg1 is the value of the 2nd argument passed to this method - // invocation. - Arg1 time.Duration - // Arg2 is the value of the 3rd argument passed to this method - // invocation. - Arg2 bool - // Arg3 is the value of the 4th argument passed to this method - // invocation. - Arg3 *int - // Arg4 is the value of the 5th argument passed to this method - // invocation. - Arg4 int - // Arg5 is the value of the 6th argument passed to this method - // invocation. - Arg5 time.Time - // Result0 is the value of the 1st result returned from this method - // invocation. - Result0 []int - // Result1 is the value of the 2nd result returned from this method - // invocation. - Result1 error -} - -// Args returns an interface slice containing the arguments of this -// invocation. -func (c StoreGetRepositoriesForIndexScanFuncCall) Args() []interface{} { - return []interface{}{c.Arg0, c.Arg1, c.Arg2, c.Arg3, c.Arg4, c.Arg5} -} - -// Results returns an interface slice containing the results of this -// invocation. -func (c StoreGetRepositoriesForIndexScanFuncCall) Results() []interface{} { - return []interface{}{c.Result0, c.Result1} -} - // StoreInsertDependencyIndexingJobFunc describes the behavior when the // InsertDependencyIndexingJob method of the parent MockStore instance is // invoked. diff --git a/internal/codeintel/autoindexing/service.go b/internal/codeintel/autoindexing/service.go index d1e694f51a35..8802c1b8f159 100644 --- a/internal/codeintel/autoindexing/service.go +++ b/internal/codeintel/autoindexing/service.go @@ -145,10 +145,6 @@ func IsLimitError(err error) bool { return errors.As(err, &inference.LimitError{}) } -func (s *Service) GetRepositoriesForIndexScan(ctx context.Context, processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int, now time.Time) ([]int, error) { - return s.store.GetRepositoriesForIndexScan(ctx, processDelay, allowGlobalPolicies, repositoryMatchLimit, limit, now) -} - func (s *Service) RepositoryIDsWithConfiguration(ctx context.Context, offset, limit int) ([]uploadsshared.RepositoryWithAvailableIndexers, int, error) { return s.store.RepositoryIDsWithConfiguration(ctx, offset, limit) } diff --git a/internal/codeintel/reposcheduler/BUILD.bazel b/internal/codeintel/reposcheduler/BUILD.bazel new file mode 100644 index 000000000000..391132215a91 --- /dev/null +++ b/internal/codeintel/reposcheduler/BUILD.bazel @@ -0,0 +1,40 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("//dev:go_defs.bzl", "go_test") + +go_library( + name = "reposcheduler", + srcs = [ + "init.go", + "options.go", + "store.go", + "store_observability.go", + ], + importpath = "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler", + visibility = ["//:__subpackages__"], + deps = [ + "//internal/database", + "//internal/database/basestore", + "//internal/database/dbutil", + "//internal/metrics", + "//internal/observation", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_sourcegraph_log//:log", + "@io_opentelemetry_go_otel//attribute", + ], +) + +go_test( + name = "reposcheduler_test", + srcs = ["store_test.go"], + embed = [":reposcheduler"], + tags = ["requires-network"], + deps = [ + "//internal/database", + "//internal/database/dbtest", + "//internal/observation", + "//internal/timeutil", + "@com_github_google_go_cmp//cmp", + "@com_github_keegancsmith_sqlf//:sqlf", + "@com_github_sourcegraph_log//logtest", + ], +) diff --git a/internal/codeintel/reposcheduler/init.go b/internal/codeintel/reposcheduler/init.go new file mode 100644 index 000000000000..1d19fe933007 --- /dev/null +++ b/internal/codeintel/reposcheduler/init.go @@ -0,0 +1,30 @@ +package reposcheduler + +import ( + "context" + "time" +) + +type RepositoryToIndex struct { + ID int +} + +type RepositorySchedulingService interface { + GetRepositoriesForIndexScan(ctx context.Context, _ RepositoryBatchOptions, now time.Time) (_ []RepositoryToIndex, err error) +} + +type service struct { + store RepositorySchedulingStore +} + +var _ RepositorySchedulingService = &service{} + +func NewService(store RepositorySchedulingStore) RepositorySchedulingService { + return &service{ + store: store, + } +} + +func (s *service) GetRepositoriesForIndexScan(ctx context.Context, batchOptions RepositoryBatchOptions, now time.Time) ([]RepositoryToIndex, error) { + return s.store.GetRepositoriesForIndexScan(ctx, batchOptions, now) +} diff --git a/internal/codeintel/reposcheduler/options.go b/internal/codeintel/reposcheduler/options.go new file mode 100644 index 000000000000..ab25d749e688 --- /dev/null +++ b/internal/codeintel/reposcheduler/options.go @@ -0,0 +1,33 @@ +package reposcheduler + +import "time" + +type RepositoryBatchOptions struct { + // Time to wait before making repository visible again + // after previous scan. Note that this delay can be superseded if repository + // has been changed in the meantime + ProcessDelay time.Duration + + // If allowGlobalPolicies is false, then configuration policies that do not specify a repository name + // or patterns will be ignored. + // When true, such policies apply over all repositories known to the instance. + AllowGlobalPolicies bool + + // This optional limit controls how many repositores will be considered by matching + // via global policy. As global policy matches large sets of repositories, + // this limit allows reducing the number of repositories that will be considered + // for scanning. + GlobalPolicyRepositoriesMatchLimit *int + + // The maximum number repositories that will be returned in a batch + Limit int +} + +func NewBatchOptions(processDelay time.Duration, allowGlobalPolicies bool, repositoryMatchLimit *int, limit int) RepositoryBatchOptions { + return RepositoryBatchOptions{ + ProcessDelay: processDelay, + AllowGlobalPolicies: allowGlobalPolicies, + GlobalPolicyRepositoriesMatchLimit: repositoryMatchLimit, + Limit: limit, + } +} diff --git a/internal/codeintel/reposcheduler/store.go b/internal/codeintel/reposcheduler/store.go new file mode 100644 index 000000000000..c81c4ed2bd76 --- /dev/null +++ b/internal/codeintel/reposcheduler/store.go @@ -0,0 +1,289 @@ +package reposcheduler + +import ( + "context" + "fmt" + "time" + + "github.com/keegancsmith/sqlf" + logger "github.com/sourcegraph/log" + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/basestore" + "github.com/sourcegraph/sourcegraph/internal/database/dbutil" + "github.com/sourcegraph/sourcegraph/internal/observation" + "go.opentelemetry.io/otel/attribute" +) + +type RepoRev struct { + ID int + RepositoryID int + Rev string +} + +type RepositorySchedulingStore interface { + WithTransaction(ctx context.Context, f func(tx RepositorySchedulingStore) error) error + GetRepositoriesForIndexScan(ctx context.Context, batchOptions RepositoryBatchOptions, now time.Time) ([]RepositoryToIndex, error) +} + +type store struct { + db *basestore.Store + logger logger.Logger + operations *operations + dbLayout dbLayout +} + +var _ RepositorySchedulingStore = &store{} + +type storeType int8 + +const ( + preciseStore storeType = 1 + syntacticStore storeType = 2 +) + +func NewPreciseStore(observationCtx *observation.Context, db database.DB) RepositorySchedulingStore { + return &store{ + db: basestore.NewWithHandle(db.Handle()), + logger: observationCtx.Logger.Scoped("reposcheduler.syntactic_store"), + operations: newOperations(observationCtx, preciseStore), + dbLayout: dbLayout{ + policyEnablementFieldName: "indexing_enabled", + lastScanTableName: "lsif_last_index_scan", + }, + } +} + +func NewSyntacticStore(observationCtx *observation.Context, db database.DB) RepositorySchedulingStore { + return &store{ + db: basestore.NewWithHandle(db.Handle()), + logger: observationCtx.Logger.Scoped("reposcheduler.precise_store"), + operations: newOperations(observationCtx, syntacticStore), + dbLayout: dbLayout{ + policyEnablementFieldName: "syntactic_indexing_enabled", + lastScanTableName: "syntactic_scip_last_index_scan", + }, + } +} + +func (s *store) WithTransaction(ctx context.Context, f func(s RepositorySchedulingStore) error) error { + return s.withTransaction(ctx, func(s *store) error { return f(s) }) +} + +func (s *store) withTransaction(ctx context.Context, f func(s *store) error) error { + return basestore.InTransaction(ctx, s, f) +} + +func (s *store) Transact(ctx context.Context) (*store, error) { + tx, err := s.db.Transact(ctx) + if err != nil { + return nil, err + } + + return &store{ + logger: s.logger, + db: tx, + operations: s.operations, + }, nil +} + +func (s *store) Done(err error) error { + return s.db.Done(err) +} + +type dbLayout struct { + policyEnablementFieldName string + lastScanTableName string +} + +// GetRepositoriesForIndexScan returns a set of repository identifiers that should be considered +// for indexing jobs. Repositories that were returned previously from this call within the given +// process delay are not returned. +func (store *store) GetRepositoriesForIndexScan( + ctx context.Context, + batchOptions RepositoryBatchOptions, + now time.Time, +) (_ []RepositoryToIndex, err error) { + var globalPolicyRepositoryMatchLimitValue int + if batchOptions.GlobalPolicyRepositoriesMatchLimit != nil { + globalPolicyRepositoryMatchLimitValue = *batchOptions.GlobalPolicyRepositoriesMatchLimit + } + + ctx, _, endObservation := store.operations.getRepositoriesForIndexScan.With(ctx, &err, + observation.Args{Attrs: []attribute.KeyValue{ + attribute.Bool("allowGlobalPolicies", batchOptions.AllowGlobalPolicies), + attribute.Int("globalPolicyRepositoryMatchLimit", globalPolicyRepositoryMatchLimitValue), + attribute.Int("limit", batchOptions.Limit), + }}) + defer endObservation(1, observation.Args{}) + + queries := make([]*sqlf.Query, 0, 3) + if batchOptions.AllowGlobalPolicies { + queries = append(queries, sqlf.Sprintf( + getRepositoriesForIndexScanGlobalRepositoriesQuery, + optionalLimit(batchOptions.GlobalPolicyRepositoriesMatchLimit), + )) + } + queries = append(queries, sqlf.Sprintf(getRepositoriesForIndexScanRepositoriesWithPolicyQuery(store.dbLayout))) + queries = append(queries, sqlf.Sprintf(getRepositoriesForIndexScanRepositoriesWithPolicyViaPatternQuery(store.dbLayout))) + + for i, query := range queries { + queries[i] = sqlf.Sprintf("(%s)", query) + } + + query := getRepositoriesForIndexScanQuery(store.dbLayout) + + finalQuery := sqlf.Sprintf( + query, + sqlf.Join(queries, " UNION ALL "), + now, + int(batchOptions.ProcessDelay/time.Second), + batchOptions.Limit, + now, + now, + ) + + repositoryIds, err := basestore.ScanInts(store.db.Query(ctx, finalQuery)) + + if err != nil { + return nil, err + } + + repos := make([]RepositoryToIndex, len(repositoryIds)) + for i, repoId := range repositoryIds { + repos[i] = RepositoryToIndex{ID: repoId} + } + + return repos, nil + +} + +func getRepositoriesForIndexScanQuery(layout dbLayout) string { + return fmt.Sprintf(` +WITH +-- This CTE will contain a single row if there is at least one global policy, and will return an empty +-- result set otherwise. If any global policy is for HEAD, the value for the column is_head_policy will +-- be true. +global_policy_descriptor AS MATERIALIZED ( + SELECT (p.type = 'GIT_COMMIT' AND p.pattern = 'HEAD') AS is_head_policy + FROM lsif_configuration_policies p + WHERE + p.%s AND + p.repository_id IS NULL AND + p.repository_patterns IS NULL + ORDER BY is_head_policy DESC + LIMIT 1 +), +repositories_matching_policy AS ( + %%s +), +repositories AS ( + SELECT rmp.id + FROM repositories_matching_policy rmp + LEFT JOIN %s lrs ON lrs.repository_id = rmp.id + WHERE + -- Records that have not been checked within the global reindex threshold are also eligible for + -- indexing. Note that condition here is true for a record that has never been indexed. + + (%%s - lrs.last_index_scan_at > (%%s * '1 second'::interval)) IS DISTINCT FROM FALSE OR + + -- Records that have received an update since their last scan are also eligible for re-indexing. + -- Note that last_changed is NULL unless the repository is attached to a policy for HEAD. + + (rmp.last_changed > lrs.last_index_scan_at) + ORDER BY + lrs.last_index_scan_at NULLS FIRST, + rmp.id -- tie breaker + LIMIT %%s +) +INSERT INTO %s (repository_id, last_index_scan_at) +SELECT DISTINCT r.id, %%s::timestamp FROM repositories r +ON CONFLICT (repository_id) DO UPDATE +SET last_index_scan_at = %%s +RETURNING repository_id +`, layout.policyEnablementFieldName, + layout.lastScanTableName, + layout.lastScanTableName, + ) +} + +const getRepositoriesForIndexScanGlobalRepositoriesQuery = ` +SELECT + r.id, + CASE + -- Return non-NULL last_changed only for policies that are attached to a HEAD commit. + -- We don't want to superfluously return the same repos because they had an update, but + -- we only (for example) index a branch that doesn't have many active commits. + WHEN gpd.is_head_policy THEN gr.last_changed + ELSE NULL + END AS last_changed +FROM repo r +JOIN gitserver_repos gr ON gr.repo_id = r.id +JOIN global_policy_descriptor gpd ON TRUE +WHERE + r.deleted_at IS NULL AND + r.blocked IS NULL AND + gr.clone_status = 'cloned' +ORDER BY stars DESC NULLS LAST, id +%s +` + +func getRepositoriesForIndexScanRepositoriesWithPolicyQuery(layout dbLayout) string { + + return fmt.Sprintf(` +SELECT + r.id, + CASE + -- Return non-NULL last_changed only for policies that are attached to a HEAD commit. + -- We don't want to superfluously return the same repos because they had an update, but + -- we only (for example) index a branch that doesn't have many active commits. + WHEN p.type = 'GIT_COMMIT' AND p.pattern = 'HEAD' THEN gr.last_changed + ELSE NULL + END AS last_changed +FROM repo r +JOIN gitserver_repos gr ON gr.repo_id = r.id +JOIN lsif_configuration_policies p ON p.repository_id = r.id +WHERE + r.deleted_at IS NULL AND + r.blocked IS NULL AND + p.%s AND + gr.clone_status = 'cloned' +`, layout.policyEnablementFieldName) +} + +func getRepositoriesForIndexScanRepositoriesWithPolicyViaPatternQuery(layout dbLayout) string { + return fmt.Sprintf(` +SELECT + r.id, + CASE + -- Return non-NULL last_changed only for policies that are attached to a HEAD commit. + -- We don't want to superfluously return the same repos because they had an update, but + -- we only (for example) index a branch that doesn't have many active commits. + WHEN p.type = 'GIT_COMMIT' AND p.pattern = 'HEAD' THEN gr.last_changed + ELSE NULL + END AS last_changed +FROM repo r +JOIN gitserver_repos gr ON gr.repo_id = r.id +JOIN lsif_configuration_policies_repository_pattern_lookup rpl ON rpl.repo_id = r.id +JOIN lsif_configuration_policies p ON p.id = rpl.policy_id +WHERE + r.deleted_at IS NULL AND + r.blocked IS NULL AND + p.%s AND + gr.clone_status = 'cloned' +`, layout.policyEnablementFieldName) +} + +func scanRepoRev(s dbutil.Scanner) (rr RepoRev, err error) { + err = s.Scan(&rr.ID, &rr.RepositoryID, &rr.Rev) + return rr, err +} + +var scanRepoRevs = basestore.NewSliceScanner(scanRepoRev) + +func optionalLimit(limit *int) *sqlf.Query { + if limit != nil { + return sqlf.Sprintf("LIMIT %d", *limit) + } + + return sqlf.Sprintf("") +} diff --git a/internal/codeintel/reposcheduler/store_observability.go b/internal/codeintel/reposcheduler/store_observability.go new file mode 100644 index 000000000000..f74f2b63f09a --- /dev/null +++ b/internal/codeintel/reposcheduler/store_observability.go @@ -0,0 +1,58 @@ +package reposcheduler + +import ( + "fmt" + + "github.com/sourcegraph/sourcegraph/internal/metrics" + "github.com/sourcegraph/sourcegraph/internal/observation" +) + +type operations struct { + getRepositoriesForIndexScan *observation.Operation + getQueuedRepoRev *observation.Operation + markRepoRevsAsProcessed *observation.Operation + queueRepoRev *observation.Operation + isQueued *observation.Operation +} + +var ( + m = new(metrics.SingletonREDMetrics) +) + +func newOperations(observationCtx *observation.Context, storeType storeType) *operations { + var metricPrefix string + var operationNamespace string + + if storeType == preciseStore { + metricPrefix = "codeintel_precise_reposcheduler_store" + operationNamespace = "precise_reposcheduler" + } else { + metricPrefix = "codeintel_syntactic_reposcheduler_store" + operationNamespace = "syntactic_reposcheduler" + } + + m := m.Get(func() *metrics.REDMetrics { + return metrics.NewREDMetrics( + observationCtx.Registerer, + metricPrefix, + metrics.WithLabels("op"), + metrics.WithCountHelp("Total number of method invocations."), + ) + }) + + op := func(name string) *observation.Operation { + return observationCtx.Operation(observation.Op{ + Name: fmt.Sprintf("codeintel.%s.store.%s", operationNamespace, name), + MetricLabelValues: []string{name}, + Metrics: m, + }) + } + + return &operations{ + getRepositoriesForIndexScan: op("GetRepositoriesForIndexScan"), + getQueuedRepoRev: op("GetQueuedRepoRev"), + markRepoRevsAsProcessed: op("MarkRepoRevsAsProcessed"), + queueRepoRev: op("QueueRepoRev"), + isQueued: op("IsQueued"), + } +} diff --git a/internal/codeintel/reposcheduler/store_test.go b/internal/codeintel/reposcheduler/store_test.go new file mode 100644 index 000000000000..78238cfdc773 --- /dev/null +++ b/internal/codeintel/reposcheduler/store_test.go @@ -0,0 +1,303 @@ +package reposcheduler + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/keegancsmith/sqlf" + "github.com/sourcegraph/log/logtest" + + "github.com/sourcegraph/sourcegraph/internal/database" + "github.com/sourcegraph/sourcegraph/internal/database/dbtest" + "github.com/sourcegraph/sourcegraph/internal/observation" + "github.com/sourcegraph/sourcegraph/internal/timeutil" +) + +// Helper function to setup a configuration policies table +// for either precise or syntactic indexing exclusively +func databaseSetupQuery(enabledColumn string) string { + var disabledColumn string + if enabledColumn == "indexing_enabled" { + disabledColumn = "syntactic_indexing_enabled" + } else { + + disabledColumn = "indexing_enabled" + } + + base := ` + INSERT INTO lsif_configuration_policies ( + id, + repository_id, + name, + type, + pattern, + repository_patterns, + retention_enabled, + retention_duration_hours, + retain_intermediate_commits, + %s, -- enabled column + %s, -- disabled column + index_commit_max_age_hours, + index_intermediate_commits + ) VALUES + -- enabled | | disabled + -- v v + (101, 50, 'policy 1', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, false, 0, false), + (102, 51, 'policy 2', 'GIT_COMMIT', 'HEAD', null, true, 0, false, true, false, 0, false), + (103, 52, 'policy 3', 'GIT_TREE', 'ef/', null, true, 0, false, true, false, 0, false), + (104, 53, 'policy 4', 'GIT_TREE', 'gh/', null, true, 0, false, true, false, 0, false), + (105, 54, 'policy 5', 'GIT_TREE', 'gh/', null, true, 0, false, false, false, 0, false) + +` + + return fmt.Sprintf(base, enabledColumn, disabledColumn) +} + +func TestSelectRepositoriesForSyntacticIndexing(t *testing.T) { + logger := logtest.Scoped(t) + db := database.NewDB(logger, dbtest.NewDB(t)) + preciseStore := testPreciseStoreWithoutConfigurationPolicies(t, db) + syntacticStore := testSyntacticStoreWithoutConfigurationPolicies(t, db) + + insertRepo(t, db, 50, "r0") + insertRepo(t, db, 51, "r1") + insertRepo(t, db, 52, "r2") + insertRepo(t, db, 53, "r3") + + var tests = []struct { + name string + store RepositorySchedulingStore + enabledColumn string + }{ + {"precise store", preciseStore, "indexing_enabled"}, + {"syntactic store", syntacticStore, "syntactic_indexing_enabled"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + if _, err := db.ExecContext(context.Background(), "TRUNCATE lsif_configuration_policies"); err != nil { + t.Fatalf("unexpected error while truncating configuration policies: %s", err) + } + setupQuery := databaseSetupQuery(tt.enabledColumn) + if _, err := db.ExecContext(context.Background(), setupQuery); err != nil { + t.Fatalf("unexpected error while inserting configuration policies: %s", err) + } + + now := timeutil.Now() + updateGitserverUpdatedAt(t, db, now) + + // The following tests simulate the passage of time (i.e. repeated scheduled invocations of the repo scheduling logic) + // T is time + + // N.B. We use 1 hour process delay in all those tests, + // which means that IF the repository id was returned, it will be put on cooldown for an hour + + // T = 0: No records in last index scan table, so we return all repos permitted by the limit parameter + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 2), now, []int{50, 51}) + + // T + 20 minutes: first two repositories are still on cooldown + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*20), []int{52, 53}) + + // T + 30 minutes: all repositories are on cooldown + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*30), []int(nil)) + + // T + 90 minutes: all repositories are visible again + // Repos 50, 51 are visible because they were scheduled at (T + 0) - which is more than 1 hour ago + // Repos 52, 53 are visible because they were scheduled at (T + 20) - which is more than 1 hour ago + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*90), []int{50, 51, 52, 53}) + + // Make a new repository, not yet covered by the configuration policies + insertRepo(t, db, 54, "r4") + + // T + 95: no repositories are visible + // Repos 50,51,52,53 are invisible because they were scheduled at (T + 90), which is less than 1 hour ago + // Repo 54 is invisible because it doesn't have `indexing_enabled=true` in the configuration policies table + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*95), []int(nil)) + + // Explicitly enable indexing for repo 54 + query := fmt.Sprintf(`UPDATE lsif_configuration_policies SET %s = true WHERE id = 105`, tt.enabledColumn) + if _, err := db.ExecContext(context.Background(), query); err != nil { + t.Fatalf("unexpected error while inserting configuration policies: %s", err) + } + + // T + 100: only repository 54 is visible + // Repos 50-53 are still on cooldown as they were scheduled at (T + 90), less than 1 hour ago + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*100), []int{54}) + + // T + 110: no repositories are visible + // Repos 50,51,52,53 are invisible because they were scheduled at (T + 90), which is less than 1 hour ago + // Repo 54 is invisible because it was scheduled at (T + 100), which is less than 1 hour ago + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*110), []int(nil)) + + // Update repo 50 (GIT_COMMIT/HEAD policy), and 51 (GIT_TREE policy) + gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s WHERE repo_id IN (50, 52)`, now.Add(time.Minute*105)) + if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil { + t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err) + } + + // T + 110: only repository 50 is visible + // Repos 51-54 are invisible because they were scheduled less than 1 hour ago + // Repo 50 is visible despite it being scheduled less than 1 hour ago - it was recently updated, so that takes precedence + assertRepoList(t, tt.store, NewBatchOptions(time.Hour, true, nil, 100), now.Add(time.Minute*110), []int{50}) + }) + } +} + +/* + +This test verifies that repository scheduling works when there's only a single global policy +*/ + +func TestSelectRepositoriesForIndexScanWithGlobalPolicy(t *testing.T) { + logger := logtest.Scoped(t) + db := database.NewDB(logger, dbtest.NewDB(t)) + preciseStore := testPreciseStoreWithoutConfigurationPolicies(t, db) + syntacticStore := testSyntacticStoreWithoutConfigurationPolicies(t, db) + + now := timeutil.Now() + insertRepo(t, db, 50, "r0") + insertRepo(t, db, 51, "r1") + insertRepo(t, db, 52, "r2") + insertRepo(t, db, 53, "r3") + updateGitserverUpdatedAt(t, db, now) + + query := ` + INSERT INTO lsif_configuration_policies ( + id, + repository_id, + name, + type, + pattern, + repository_patterns, + retention_enabled, + retention_duration_hours, + retain_intermediate_commits, + indexing_enabled, + syntactic_indexing_enabled, + index_commit_max_age_hours, + index_intermediate_commits + ) VALUES + -- indexing | | syntactic indexing + -- v v + (101, NULL, 'policy 1', 'GIT_TREE', 'ab/', null, true, 0, false, true, true, 0, false) + ` + if _, err := db.ExecContext(context.Background(), query); err != nil { + t.Fatalf("unexpected error while inserting configuration policies: %s", err) + } + + var tests = []struct { + name string + store RepositorySchedulingStore + }{ + {"precise store", preciseStore}, + {"syntactic store", syntacticStore}, + } + + // Returns at most configured limit + limit := 2 + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // The following tests simulate the passage of time (i.e. repeated scheduled invocations of the repo scheduling logic) + // T is time + + // N.B. We use 1 hour process delay in all those tests, + // which means that IF the repository id was returned, it will be put on cooldown for an hour + + // Returns nothing when disabled + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, false, nil, 100), now, []int(nil)) + + // T = 0: No records in last index scan table, so we return all repos permitted by the limit parameter + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, &limit, 100), now, []int{50, 51}) + + // T + 20 minutes: first two repositories are still on cooldown + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*20), []int{52, 53}) + + // T + 30 minutes: all repositories are on cooldown + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*30), []int(nil)) + + // T + 90 minutes: all repositories are visible again + // Repos 50, 51 are visible because they were scheduled at (T + 0) - which is more than 1 hour ago + // Repos 52, 53 are visible because they were scheduled at (T + 20) - which is more than 1 hour ago + assertRepoList(t, tt.store, NewBatchOptions(1*time.Hour, true, nil, 100), now.Add(time.Minute*90), []int{50, 51, 52, 53}) + + }) + } + +} + +// removes default configuration policies +func testPreciseStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) RepositorySchedulingStore { + if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil { + t.Fatalf("unexpected error while inserting configuration policies: %s", err) + } + + return NewPreciseStore(&observation.TestContext, db) +} + +func testSyntacticStoreWithoutConfigurationPolicies(t *testing.T, db database.DB) RepositorySchedulingStore { + if _, err := db.ExecContext(context.Background(), `TRUNCATE lsif_configuration_policies`); err != nil { + t.Fatalf("unexpected error while inserting configuration policies: %s", err) + } + + return NewSyntacticStore(&observation.TestContext, db) +} + +func assertRepoList(t *testing.T, store RepositorySchedulingStore, batchOptions RepositoryBatchOptions, now time.Time, want []int) { + t.Helper() + wantedRepos := make([]RepositoryToIndex, len(want)) + for i, repoId := range want { + wantedRepos[i] = RepositoryToIndex{ID: repoId} + } + if repositories, err := store.GetRepositoriesForIndexScan(context.Background(), batchOptions, now); err != nil { + t.Fatalf("unexpected error fetching repositories for index scan: %s", err) + } else if diff := cmp.Diff(wantedRepos, repositories); diff != "" { + t.Fatalf("unexpected repository list (-want +got):\n%s", diff) + } +} + +func updateGitserverUpdatedAt(t *testing.T, db database.DB, now time.Time) { + gitserverReposQuery := sqlf.Sprintf(`UPDATE gitserver_repos SET last_changed = %s`, now.Add(-time.Hour*24)) + if _, err := db.ExecContext(context.Background(), gitserverReposQuery.Query(sqlf.PostgresBindVar), gitserverReposQuery.Args()...); err != nil { + t.Fatalf("unexpected error while upodating gitserver_repos last updated time: %s", err) + } +} + +func insertRepo(t testing.TB, db database.DB, id int, name string) { + if name == "" { + name = fmt.Sprintf("n-%d", id) + } + + deletedAt := sqlf.Sprintf("NULL") + if strings.HasPrefix(name, "DELETED-") { + deletedAt = sqlf.Sprintf("%s", time.Unix(1587396557, 0).UTC()) + } + insertRepoQuery := sqlf.Sprintf( + `INSERT INTO repo (id, name, deleted_at, private) VALUES (%s, %s, %s, %s) ON CONFLICT (id) DO NOTHING`, + id, + name, + deletedAt, + false, + ) + if _, err := db.ExecContext(context.Background(), insertRepoQuery.Query(sqlf.PostgresBindVar), insertRepoQuery.Args()...); err != nil { + t.Fatalf("unexpected error while upserting repository: %s", err) + } + + status := "cloned" + if strings.HasPrefix(name, "DELETED-") { + status = "not_cloned" + } + updateGitserverRepoQuery := sqlf.Sprintf( + `UPDATE gitserver_repos SET clone_status = %s WHERE repo_id = %s`, + status, + id, + ) + if _, err := db.ExecContext(context.Background(), updateGitserverRepoQuery.Query(sqlf.PostgresBindVar), updateGitserverRepoQuery.Args()...); err != nil { + t.Fatalf("unexpected error while upserting gitserver repository: %s", err) + } +} diff --git a/internal/codeintel/services.go b/internal/codeintel/services.go index 77a0772588ea..922f4a8e92a7 100644 --- a/internal/codeintel/services.go +++ b/internal/codeintel/services.go @@ -8,6 +8,7 @@ import ( ossdependencies "github.com/sourcegraph/sourcegraph/internal/codeintel/dependencies" "github.com/sourcegraph/sourcegraph/internal/codeintel/policies" "github.com/sourcegraph/sourcegraph/internal/codeintel/ranking" + "github.com/sourcegraph/sourcegraph/internal/codeintel/reposcheduler" "github.com/sourcegraph/sourcegraph/internal/codeintel/sentinel" codeintelshared "github.com/sourcegraph/sourcegraph/internal/codeintel/shared" "github.com/sourcegraph/sourcegraph/internal/codeintel/uploads" @@ -17,15 +18,16 @@ import ( ) type Services struct { - AutoIndexingService *autoindexing.Service - CodenavService *codenav.Service - DependenciesService *ossdependencies.Service - PoliciesService *policies.Service - RankingService *ranking.Service - UploadsService *uploads.Service - SentinelService *sentinel.Service - ContextService *context.Service - GitserverClient gitserver.Client + AutoIndexingService *autoindexing.Service + PreciseRepoSchedulingService reposcheduler.RepositorySchedulingService + CodenavService *codenav.Service + DependenciesService *ossdependencies.Service + PoliciesService *policies.Service + RankingService *ranking.Service + UploadsService *uploads.Service + SentinelService *sentinel.Service + ContextService *context.Service + GitserverClient gitserver.Client } type ServiceDependencies struct { @@ -46,16 +48,18 @@ func NewServices(deps ServiceDependencies) (Services, error) { rankingSvc := ranking.NewService(deps.ObservationCtx, db, codeIntelDB) sentinelService := sentinel.NewService(deps.ObservationCtx, db) contextService := context.NewService(deps.ObservationCtx, db) + reposchedulingService := reposcheduler.NewService(reposcheduler.NewPreciseStore(deps.ObservationCtx, db)) return Services{ - AutoIndexingService: autoIndexingSvc, - CodenavService: codenavSvc, - DependenciesService: dependenciesSvc, - PoliciesService: policiesSvc, - RankingService: rankingSvc, - UploadsService: uploadsSvc, - SentinelService: sentinelService, - ContextService: contextService, - GitserverClient: gitserverClient, + AutoIndexingService: autoIndexingSvc, + PreciseRepoSchedulingService: reposchedulingService, + CodenavService: codenavSvc, + DependenciesService: dependenciesSvc, + PoliciesService: policiesSvc, + RankingService: rankingSvc, + UploadsService: uploadsSvc, + SentinelService: sentinelService, + ContextService: contextService, + GitserverClient: gitserverClient, }, nil } diff --git a/internal/database/migration/shared/data/stitched-migration-graph.json b/internal/database/migration/shared/data/stitched-migration-graph.json index 91c21105343a..997fea14741e 100644 --- a/internal/database/migration/shared/data/stitched-migration-graph.json +++ b/internal/database/migration/shared/data/stitched-migration-graph.json @@ -11154,6 +11154,19 @@ "IndexName": "access_tokens_lookup_double_hash" } }, + { + "ID": 1709738515, + "Name": "Add syntactic indexing support to policies", + "UpQuery": "alter table lsif_configuration_policies add column if not exists syntactic_indexing_enabled bool not null default false;\n\nCREATE TABLE IF NOT EXISTS syntactic_scip_last_index_scan(\n repository_id int NOT NULL,\n last_index_scan_at timestamp with time zone NOT NULL,\n PRIMARY KEY(repository_id)\n);\n\nCOMMENT ON TABLE syntactic_scip_last_index_scan IS 'Tracks the last time repository was checked for syntactic indexing job scheduling.';\nCOMMENT ON COLUMN syntactic_scip_last_index_scan.last_index_scan_at IS 'The last time uploads of this repository were considered for syntactic indexing job scheduling.';", + "DownQuery": "alter table lsif_configuration_policies drop column if exists syntactic_indexing_enabled;\ndrop table if exists syntactic_scip_last_index_scan;", + "Privileged": false, + "NonIdempotent": false, + "Parents": [ + 1708596613 + ], + "IsCreateIndexConcurrently": false, + "IndexMetadata": null + }, { "ID": 1711003437, "Name": "remove feature flag foreign key", @@ -11430,6 +11443,7 @@ "v5.3.0": { "RootID": 1648051770, "LeafIDs": [ + 1709738515, 1711003437, 1711538234 ], diff --git a/internal/database/schema.json b/internal/database/schema.json index dc4f53e0dacd..c69c1cf420f7 100644 --- a/internal/database/schema.json +++ b/internal/database/schema.json @@ -15552,6 +15552,19 @@ "GenerationExpression": "", "Comment": "Whether or not this configuration policy affects data retention rules." }, + { + "Name": "syntactic_indexing_enabled", + "Index": 17, + "TypeName": "boolean", + "IsNullable": false, + "Default": "false", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + }, { "Name": "type", "Index": 4, @@ -26586,6 +26599,52 @@ ], "Triggers": [] }, + { + "Name": "syntactic_scip_last_index_scan", + "Comment": "Tracks the last time repository was checked for syntactic indexing job scheduling.", + "Columns": [ + { + "Name": "last_index_scan_at", + "Index": 2, + "TypeName": "timestamp with time zone", + "IsNullable": false, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "The last time uploads of this repository were considered for syntactic indexing job scheduling." + }, + { + "Name": "repository_id", + "Index": 1, + "TypeName": "integer", + "IsNullable": false, + "Default": "", + "CharacterMaximumLength": 0, + "IsIdentity": false, + "IdentityGeneration": "", + "IsGenerated": "NEVER", + "GenerationExpression": "", + "Comment": "" + } + ], + "Indexes": [ + { + "Name": "syntactic_scip_last_index_scan_pkey", + "IsPrimaryKey": true, + "IsUnique": true, + "IsExclusion": false, + "IsDeferrable": false, + "IndexDefinition": "CREATE UNIQUE INDEX syntactic_scip_last_index_scan_pkey ON syntactic_scip_last_index_scan USING btree (repository_id)", + "ConstraintType": "p", + "ConstraintDefinition": "PRIMARY KEY (repository_id)" + } + ], + "Constraints": null, + "Triggers": [] + }, { "Name": "team_members", "Comment": "", diff --git a/internal/database/schema.md b/internal/database/schema.md index 23c29ff83844..0130e9a656f4 100644 --- a/internal/database/schema.md +++ b/internal/database/schema.md @@ -2120,6 +2120,7 @@ Stores data points for a code insight that do not need to be queried directly, b repository_patterns | text[] | | | last_resolved_at | timestamp with time zone | | | embeddings_enabled | boolean | | not null | false + syntactic_indexing_enabled | boolean | | not null | false Indexes: "lsif_configuration_policies_pkey" PRIMARY KEY, btree (id) "lsif_configuration_policies_repository_id" btree (repository_id) @@ -4050,6 +4051,21 @@ Stores metadata about a code intel syntactic index job. **execution_logs**: An array of [log entries](https://sourcegraph.com/github.com/sourcegraph/sourcegraph@3.23/-/blob/internal/workerutil/store.go#L48:6) (encoded as JSON) from the most recent execution. +# Table "public.syntactic_scip_last_index_scan" +``` + Column | Type | Collation | Nullable | Default +--------------------+--------------------------+-----------+----------+--------- + repository_id | integer | | not null | + last_index_scan_at | timestamp with time zone | | not null | +Indexes: + "syntactic_scip_last_index_scan_pkey" PRIMARY KEY, btree (repository_id) + +``` + +Tracks the last time repository was checked for syntactic indexing job scheduling. + +**last_index_scan_at**: The last time uploads of this repository were considered for syntactic indexing job scheduling. + # Table "public.team_members" ``` Column | Type | Collation | Nullable | Default diff --git a/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/down.sql b/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/down.sql new file mode 100644 index 000000000000..83caece702f0 --- /dev/null +++ b/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/down.sql @@ -0,0 +1,2 @@ +alter table lsif_configuration_policies drop column if exists syntactic_indexing_enabled; +drop table if exists syntactic_scip_last_index_scan; diff --git a/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/metadata.yaml b/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/metadata.yaml new file mode 100644 index 000000000000..9a85044b6058 --- /dev/null +++ b/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/metadata.yaml @@ -0,0 +1,2 @@ +name: Add syntactic indexing support to policies +parents: [1708596613] diff --git a/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/up.sql b/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/up.sql new file mode 100644 index 000000000000..913f9a9b3215 --- /dev/null +++ b/migrations/frontend/1709738515_add_syntactic_indexing_support_to_policies/up.sql @@ -0,0 +1,10 @@ +alter table lsif_configuration_policies add column if not exists syntactic_indexing_enabled bool not null default false; + +CREATE TABLE IF NOT EXISTS syntactic_scip_last_index_scan( + repository_id int NOT NULL, + last_index_scan_at timestamp with time zone NOT NULL, + PRIMARY KEY(repository_id) +); + +COMMENT ON TABLE syntactic_scip_last_index_scan IS 'Tracks the last time repository was checked for syntactic indexing job scheduling.'; +COMMENT ON COLUMN syntactic_scip_last_index_scan.last_index_scan_at IS 'The last time uploads of this repository were considered for syntactic indexing job scheduling.'; diff --git a/migrations/frontend/squashed.sql b/migrations/frontend/squashed.sql index 3c7e870f49ec..1e97bedacb87 100644 --- a/migrations/frontend/squashed.sql +++ b/migrations/frontend/squashed.sql @@ -1722,7 +1722,8 @@ CREATE TABLE lsif_configuration_policies ( protected boolean DEFAULT false NOT NULL, repository_patterns text[], last_resolved_at timestamp with time zone, - embeddings_enabled boolean DEFAULT false NOT NULL + embeddings_enabled boolean DEFAULT false NOT NULL, + syntactic_indexing_enabled boolean DEFAULT false NOT NULL ); COMMENT ON COLUMN lsif_configuration_policies.repository_id IS 'The identifier of the repository to which this configuration policy applies. If absent, this policy is applied globally.'; @@ -4685,6 +4686,15 @@ CREATE VIEW syntactic_scip_indexing_jobs_with_repository_name AS JOIN repo r ON ((r.id = u.repository_id))) WHERE (r.deleted_at IS NULL); +CREATE TABLE syntactic_scip_last_index_scan ( + repository_id integer NOT NULL, + last_index_scan_at timestamp with time zone NOT NULL +); + +COMMENT ON TABLE syntactic_scip_last_index_scan IS 'Tracks the last time repository was checked for syntactic indexing job scheduling.'; + +COMMENT ON COLUMN syntactic_scip_last_index_scan.last_index_scan_at IS 'The last time uploads of this repository were considered for syntactic indexing job scheduling.'; + CREATE TABLE team_members ( team_id integer NOT NULL, user_id integer NOT NULL, @@ -5803,6 +5813,9 @@ ALTER TABLE ONLY survey_responses ALTER TABLE ONLY syntactic_scip_indexing_jobs ADD CONSTRAINT syntactic_scip_indexing_jobs_pkey PRIMARY KEY (id); +ALTER TABLE ONLY syntactic_scip_last_index_scan + ADD CONSTRAINT syntactic_scip_last_index_scan_pkey PRIMARY KEY (repository_id); + ALTER TABLE ONLY team_members ADD CONSTRAINT team_members_team_id_user_id_key PRIMARY KEY (team_id, user_id); @@ -7043,9 +7056,9 @@ ALTER TABLE ONLY webhooks ALTER TABLE ONLY zoekt_repos ADD CONSTRAINT zoekt_repos_repo_id_fkey FOREIGN KEY (repo_id) REFERENCES repo(id) ON DELETE CASCADE; -INSERT INTO lsif_configuration_policies VALUES (1, NULL, 'Default tip-of-branch retention policy', 'GIT_TREE', '*', true, 2016, false, false, 0, false, true, NULL, NULL, false); -INSERT INTO lsif_configuration_policies VALUES (2, NULL, 'Default tag retention policy', 'GIT_TAG', '*', true, 8064, false, false, 0, false, true, NULL, NULL, false); -INSERT INTO lsif_configuration_policies VALUES (3, NULL, 'Default commit retention policy', 'GIT_TREE', '*', true, 168, true, false, 0, false, true, NULL, NULL, false); +INSERT INTO lsif_configuration_policies VALUES (1, NULL, 'Default tip-of-branch retention policy', 'GIT_TREE', '*', true, 2016, false, false, 0, false, true, NULL, NULL, false, false); +INSERT INTO lsif_configuration_policies VALUES (2, NULL, 'Default tag retention policy', 'GIT_TAG', '*', true, 8064, false, false, 0, false, true, NULL, NULL, false, false); +INSERT INTO lsif_configuration_policies VALUES (3, NULL, 'Default commit retention policy', 'GIT_TREE', '*', true, 168, true, false, 0, false, true, NULL, NULL, false, false); SELECT pg_catalog.setval('lsif_configuration_policies_id_seq', 3, true);