Skip to content

Commit 418ea6c

Browse files
Merge pull request #1678 from spring-financial-group/feat/status-breakup
feat: batched graphQL for keeper status queries
2 parents 2ff7b86 + 3860303 commit 418ea6c

File tree

6 files changed

+315
-39
lines changed

6 files changed

+315
-39
lines changed

pkg/keeper/keeper.go

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,40 +1889,8 @@ func restAPISearch(spc scmProviderClient, log *logrus.Entry, queries keeper.Quer
18891889
}
18901890

18911891
func bucketedGraphQLSearch(querier querier, query keeper.Query, log *logrus.Entry) ([]PullRequest, error) {
1892-
bucketedQueries := query.BucketedQueries(100)
1893-
var wg sync.WaitGroup
1894-
resultsChan := make(chan []PullRequest, len(bucketedQueries))
1895-
errsChan := make(chan error, len(bucketedQueries))
1896-
1897-
for idx, q := range bucketedQueries {
1898-
wg.Add(1)
1899-
go func(idx int, q string) {
1900-
defer wg.Done()
1901-
bucketResults, err := graphQLSearch(querier, log, q, time.Time{}, time.Now())
1902-
if err != nil {
1903-
errsChan <- fmt.Errorf("graphQLSearch failed for bucket %d with query %s: %w", idx, q, err)
1904-
return
1905-
}
1906-
resultsChan <- bucketResults
1907-
}(idx, q)
1908-
}
1909-
1910-
wg.Wait()
1911-
close(resultsChan)
1912-
close(errsChan)
1913-
1914-
var results []PullRequest
1915-
for r := range resultsChan {
1916-
results = append(results, r...)
1917-
}
1918-
var errs []error
1919-
for err := range errsChan {
1920-
errs = append(errs, err)
1921-
}
1922-
if len(errs) > 0 {
1923-
return results, fmt.Errorf("one or more bucketed GraphQL searches failed: %w", errorutil.NewAggregate(errs...))
1924-
}
1925-
return results, nil
1892+
bucketedQueries := query.BucketedQueries(repoBucketSize)
1893+
return executeBucketedQueries(querier, log, bucketedQueries, time.Time{}, time.Now())
19261894
}
19271895

19281896
func loadMissingLabels(spc scmProviderClient, pr *scm.PullRequest) error {

pkg/keeper/keeper_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net/http/httptest"
2626
"reflect"
2727
"strings"
28+
"sync"
2829
"testing"
2930
"text/template"
3031
"time"
@@ -512,6 +513,9 @@ type fgc struct {
512513
ignoreExpected bool
513514
combinedStatus map[string]map[string]commitStatus
514515
fakeClient *scm.Client
516+
517+
mu sync.Mutex
518+
queryLog []string
515519
}
516520

517521
func (f *fgc) ListPullRequestComments(owner, repo string, number int) ([]*scm.Comment, error) {
@@ -563,6 +567,11 @@ func (f *fgc) Query(ctx context.Context, q interface{}, vars map[string]interfac
563567
if !ok {
564568
return errors.New("unexpected query type")
565569
}
570+
if qs, ok := vars["query"]; ok {
571+
f.mu.Lock()
572+
f.queryLog = append(f.queryLog, string(qs.(githubql.String)))
573+
f.mu.Unlock()
574+
}
566575
for _, pr := range f.prs {
567576
sq.Search.Nodes = append(
568577
sq.Search.Nodes,

pkg/keeper/search.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@ package keeper
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

25+
"github.com/jenkins-x/lighthouse/pkg/errorutil"
2426
"github.com/jenkins-x/lighthouse/pkg/scmprovider"
2527
githubql "github.com/shurcooL/githubv4"
2628
"github.com/sirupsen/logrus"
2729
)
2830

31+
// repoBucketSize is the maximum number of repos to include in a single GraphQL search query.
32+
const repoBucketSize = 100
33+
2934
type querier func(ctx context.Context, result interface{}, vars map[string]interface{}) error
3035

3136
func datedQuery(q string, start, end time.Time) string {
@@ -82,6 +87,43 @@ func graphQLSearch(query querier, log *logrus.Entry, q string, start, end time.T
8287
return ret, nil
8388
}
8489

90+
// executeBucketedQueries runs bucketed queries through graphQLSearch in parallel
91+
func executeBucketedQueries(q querier, log *logrus.Entry, bucketQueries []string, start, end time.Time) ([]PullRequest, error) {
92+
var wg sync.WaitGroup
93+
resultsChan := make(chan []PullRequest, len(bucketQueries))
94+
errsChan := make(chan error, len(bucketQueries))
95+
96+
for idx, bq := range bucketQueries {
97+
wg.Add(1)
98+
go func(idx int, bq string) {
99+
defer wg.Done()
100+
bucketResults, err := graphQLSearch(q, log, bq, start, end)
101+
if err != nil {
102+
errsChan <- fmt.Errorf("graphQLSearch failed for bucket %d with query %s: %w", idx, bq, err)
103+
return
104+
}
105+
resultsChan <- bucketResults
106+
}(idx, bq)
107+
}
108+
109+
wg.Wait()
110+
close(resultsChan)
111+
close(errsChan)
112+
113+
var results []PullRequest
114+
for r := range resultsChan {
115+
results = append(results, r...)
116+
}
117+
var errs []error
118+
for err := range errsChan {
119+
errs = append(errs, err)
120+
}
121+
if len(errs) > 0 {
122+
return results, fmt.Errorf("one or more bucketed GraphQL searches failed: %w", errorutil.NewAggregate(errs...))
123+
}
124+
return results, nil
125+
}
126+
85127
// dateToken generates a GitHub graphQLSearch query token for the specified date range.
86128
// See: https://help.github.com/articles/understanding-the-search-syntax/#query-for-dates
87129
func dateToken(start, end time.Time) string {

pkg/keeper/search_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"errors"
2222
"reflect"
23+
"sort"
24+
"sync"
2325
"testing"
2426
"time"
2527

@@ -171,3 +173,139 @@ func TestSearch(t *testing.T) {
171173
})
172174
}
173175
}
176+
177+
func TestExecuteBucketedQueries(t *testing.T) {
178+
now := time.Now()
179+
earlier := now.Add(-5 * time.Hour)
180+
log := logrus.WithField("test", "TestExecuteBucketedQueries")
181+
182+
makePRs := func(numbers ...int) []PullRequest {
183+
var prs []PullRequest
184+
for _, n := range numbers {
185+
prs = append(prs, PullRequest{Number: githubql.Int(n)})
186+
}
187+
return prs
188+
}
189+
190+
sortPRs := func(prs []PullRequest) {
191+
sort.Slice(prs, func(i, j int) bool {
192+
return prs[i].Number < prs[j].Number
193+
})
194+
}
195+
196+
cases := []struct {
197+
name string
198+
bucketQ []string
199+
resultsByQ map[string][]PullRequest
200+
errsByQ map[string]error
201+
expectedPRs []PullRequest
202+
expectErr bool
203+
}{
204+
{
205+
name: "empty bucket list",
206+
bucketQ: nil,
207+
resultsByQ: map[string][]PullRequest{},
208+
errsByQ: map[string]error{},
209+
expectedPRs: nil,
210+
expectErr: false,
211+
},
212+
{
213+
name: "single bucket succeeds",
214+
bucketQ: []string{"bucket-q-1"},
215+
resultsByQ: map[string][]PullRequest{
216+
"bucket-q-1": makePRs(1, 2),
217+
},
218+
errsByQ: map[string]error{},
219+
expectedPRs: makePRs(1, 2),
220+
expectErr: false,
221+
},
222+
{
223+
name: "multiple buckets succeed",
224+
bucketQ: []string{"bucket-q-1", "bucket-q-2"},
225+
resultsByQ: map[string][]PullRequest{
226+
"bucket-q-1": makePRs(1, 2),
227+
"bucket-q-2": makePRs(3, 4),
228+
},
229+
errsByQ: map[string]error{},
230+
expectedPRs: makePRs(1, 2, 3, 4),
231+
expectErr: false,
232+
},
233+
{
234+
name: "one bucket fails, partial results",
235+
bucketQ: []string{"bucket-q-1", "bucket-q-2"},
236+
resultsByQ: map[string][]PullRequest{
237+
"bucket-q-1": makePRs(1, 2),
238+
},
239+
errsByQ: map[string]error{
240+
"bucket-q-2": errors.New("bucket-q-2 failed"),
241+
},
242+
expectedPRs: makePRs(1, 2),
243+
expectErr: true,
244+
},
245+
{
246+
name: "all buckets fail",
247+
bucketQ: []string{"bucket-q-1", "bucket-q-2"},
248+
resultsByQ: map[string][]PullRequest{},
249+
errsByQ: map[string]error{
250+
"bucket-q-1": errors.New("bucket-q-1 failed"),
251+
"bucket-q-2": errors.New("bucket-q-2 failed"),
252+
},
253+
expectedPRs: nil,
254+
expectErr: true,
255+
},
256+
}
257+
258+
for _, tc := range cases {
259+
t.Run(tc.name, func(t *testing.T) {
260+
// graphQLSearch wraps queries with datedQuery; build the dated form so
261+
// our mock can match on it.
262+
datedResults := make(map[string][]PullRequest, len(tc.resultsByQ))
263+
datedErrs := make(map[string]error, len(tc.errsByQ))
264+
datedBuckets := make([]string, len(tc.bucketQ))
265+
for i, q := range tc.bucketQ {
266+
dq := datedQuery(q, floor(earlier), floor(now))
267+
datedBuckets[i] = q // pass bare query to executeBucketedQueries
268+
if v, ok := tc.resultsByQ[q]; ok {
269+
datedResults[dq] = v
270+
}
271+
if v, ok := tc.errsByQ[q]; ok {
272+
datedErrs[dq] = v
273+
}
274+
}
275+
276+
var mu sync.Mutex
277+
mockQuery := func(_ context.Context, result interface{}, vars map[string]interface{}) error {
278+
q := string(vars["query"].(githubql.String))
279+
mu.Lock()
280+
prs, hasPRs := datedResults[q]
281+
err, hasErr := datedErrs[q]
282+
mu.Unlock()
283+
if hasErr {
284+
return err
285+
}
286+
if hasPRs {
287+
sq := result.(*searchQuery)
288+
for _, pr := range prs {
289+
sq.Search.Nodes = append(sq.Search.Nodes, PRNode{pr})
290+
}
291+
}
292+
return nil
293+
}
294+
295+
prs, err := executeBucketedQueries(mockQuery, log, datedBuckets, earlier, now)
296+
297+
if tc.expectErr && err == nil {
298+
t.Error("expected error but got nil")
299+
}
300+
if !tc.expectErr && err != nil {
301+
t.Errorf("unexpected error: %v", err)
302+
}
303+
304+
sortPRs(prs)
305+
sortPRs(tc.expectedPRs)
306+
if !reflect.DeepEqual(tc.expectedPRs, prs) {
307+
t.Errorf("prs do not match:\n%s", cmp.Diff(tc.expectedPRs, prs))
308+
}
309+
})
310+
}
311+
}

pkg/keeper/status.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func (sc *statusController) setStatuses(all []PullRequest, pool map[string]prWit
316316
// queryMap caches which queries match a repo.
317317
// Make a new one each sync loop as queries will change.
318318
queryMap := sc.config().Keeper.Queries.QueryMap()
319-
processed := sets.NewString()
319+
processed := sets.New[string]()
320320

321321
process := func(pr *PullRequest) {
322322
processed.Insert(pr.prKey())
@@ -461,8 +461,9 @@ func (sc *statusController) search() []PullRequest {
461461
}
462462

463463
orgExceptions, repos := queries.OrgExceptionsAndRepos()
464-
orgs := sets.StringKeySet(orgExceptions)
465-
query := openPRsQuery(orgs.List(), repos.List(), orgExceptions)
464+
orgs := sets.KeySet(orgExceptions)
465+
reposSet := sets.Set[string](repos)
466+
query := openPRsQuery(sets.List(orgs), sets.List(reposSet), orgExceptions)
466467
now := time.Now()
467468
log := sc.logger.WithField("query", query)
468469
if query != sc.PreviousQuery {
@@ -476,7 +477,7 @@ func (sc *statusController) search() []PullRequest {
476477
var err error
477478

478479
if sc.spc.SupportsGraphQL() {
479-
prs, err = graphQLSearch(sc.spc.Query, sc.logger, query, sc.LatestPR.Time, now)
480+
prs, err = sc.bucketedGraphQLStatusSearch(orgs, reposSet, orgExceptions, now)
480481
} else {
481482
kq := keeper.Query{}
482483
kq.Repos = append(kq.Repos, repos.List()...)
@@ -497,7 +498,13 @@ func (sc *statusController) search() []PullRequest {
497498
return nil
498499
}
499500

500-
latest := prs[len(prs)-1].UpdatedAt
501+
// Find the latest UpdatedAt across all results (bucketed queries may not be globally sorted)
502+
var latest metav1.Time
503+
for _, pr := range prs {
504+
if pr.UpdatedAt.After(latest.Time) {
505+
latest = metav1.Time{Time: pr.UpdatedAt.Time}
506+
}
507+
}
501508
if latest.IsZero() {
502509
log.WithField("latestPR", sc.LatestPR).Debug("latest PR has zero time")
503510
return prs
@@ -507,6 +514,27 @@ func (sc *statusController) search() []PullRequest {
507514
return prs
508515
}
509516

517+
func (sc *statusController) bucketedGraphQLStatusSearch(orgs sets.Set[string], repos sets.Set[string], orgExceptions map[string]sets.String, now time.Time) ([]PullRequest, error) {
518+
var bucketQueries []string
519+
520+
if orgs.Len() > 0 {
521+
// One query covering all orgs with their exclusions
522+
bucketQueries = append(bucketQueries, openPRsQuery(sets.List(orgs), nil, orgExceptions))
523+
}
524+
525+
repoList := sets.List(repos)
526+
for i := 0; i < len(repoList); i += repoBucketSize {
527+
end := i + repoBucketSize
528+
if end > len(repoList) {
529+
end = len(repoList)
530+
}
531+
// Each repo bucket query has no orgs and no exceptions.
532+
bucketQueries = append(bucketQueries, openPRsQuery(nil, repoList[i:end], nil))
533+
}
534+
535+
return executeBucketedQueries(sc.spc.Query, sc.logger, bucketQueries, sc.LatestPR.Time, now)
536+
}
537+
510538
func openPRsQuery(orgs, repos []string, orgExceptions map[string]sets.String) string {
511539
return "is:pr state:open sort:updated-asc " + orgRepoQueryString(orgs, repos, orgExceptions)
512540
}

0 commit comments

Comments
 (0)