Skip to content

Commit 6bdf940

Browse files
committed
review 2
1 parent b7c391d commit 6bdf940

File tree

6 files changed

+180
-47
lines changed

6 files changed

+180
-47
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
// Package internal contains functionality internal to the lrsclient package.
19+
package internal
20+
21+
import "time"
22+
23+
var (
24+
// TimeNow is used to get the current time. It can be overridden in tests.
25+
TimeNow func() time.Time
26+
)

xds/internal/clients/lrsclient/load_store.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ import (
2323
"sync"
2424
"sync/atomic"
2525
"time"
26-
)
2726

28-
// clockNow is used to get the current time. It can be overridden in tests.
29-
var clockNow = time.Now
27+
lrsclientinternal "google.golang.org/grpc/xds/internal/clients/lrsclient/internal"
28+
)
3029

3130
// A LoadStore aggregates loads for multiple clusters and services that are
3231
// intended to be reported via LRS.
@@ -52,6 +51,10 @@ type LoadStore struct {
5251
clusters map[string]map[string]*PerClusterReporter
5352
}
5453

54+
func init() {
55+
lrsclientinternal.TimeNow = time.Now
56+
}
57+
5558
// newLoadStore creates a LoadStore.
5659
func newLoadStore() *LoadStore {
5760
return &LoadStore{
@@ -87,7 +90,7 @@ func (ls *LoadStore) ReporterForCluster(clusterName, serviceName string) *PerClu
8790
p := &PerClusterReporter{
8891
cluster: clusterName,
8992
service: serviceName,
90-
lastLoadReportAt: clockNow(),
93+
lastLoadReportAt: lrsclientinternal.TimeNow(),
9194
}
9295
c[serviceName] = p
9396
return p
@@ -248,8 +251,8 @@ func (p *PerClusterReporter) stats() *loadData {
248251
})
249252

250253
p.mu.Lock()
251-
sd.reportInterval = clockNow().Sub(p.lastLoadReportAt)
252-
p.lastLoadReportAt = clockNow()
254+
sd.reportInterval = lrsclientinternal.TimeNow().Sub(p.lastLoadReportAt)
255+
p.lastLoadReportAt = lrsclientinternal.TimeNow()
253256
p.mu.Unlock()
254257

255258
if sd.totalDrops == 0 && len(sd.drops) == 0 && len(sd.localityStats) == 0 {

xds/internal/clients/lrsclient/load_store_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/google/go-cmp/cmp"
2828
"github.com/google/go-cmp/cmp/cmpopts"
29+
lrsclientinternal "google.golang.org/grpc/xds/internal/clients/lrsclient/internal"
2930
)
3031

3132
var (
@@ -473,38 +474,40 @@ func TestStoreStatsEmptyDataNotReported(t *testing.T) {
473474
}
474475
}
475476

476-
// TestStoreReportInterval tests that the report interval is correctly
477-
// calculated between consecutive calls to stats().
477+
// TestStoreReportInterval verify that the load report interval gets
478+
// calculated at every stats() call and is the duration between start of last
479+
// load reporting to next stats() call.
478480
func TestStoreReportInterval(t *testing.T) {
479-
originalClockNow := clockNow
480-
t.Cleanup(func() { clockNow = originalClockNow })
481+
originalTimeNow := lrsclientinternal.TimeNow
482+
t.Cleanup(func() { lrsclientinternal.TimeNow = originalTimeNow })
481483

482484
// Initial time for reporter creation
483485
currentTime := time.Now()
484-
clockNow = func() time.Time {
486+
lrsclientinternal.TimeNow = func() time.Time {
485487
return currentTime
486488
}
487489

488490
store := newLoadStore()
489491
reporter := store.ReporterForCluster("test-cluster", "test-service")
490-
// To ensure stats() returns non-nil data, report a dummy drop.
492+
// Report dummy drop to ensure stats1 is not nil.
491493
reporter.CallDropped("dummy-category")
492494

493-
// First call to stats() calculates the report interval from reporter
494-
// creation time.
495+
// Update currentTime to simulate the passage of time between the reporter
496+
// creation and first stats() call.
495497
currentTime = currentTime.Add(5 * time.Second)
496498
stats1 := reporter.stats()
497499

498500
if stats1 == nil {
499501
t.Fatalf("stats1 is nil after reporting a drop, want non-nil")
500502
}
501-
wantInterval := 5 * time.Second
502-
if stats1.reportInterval != wantInterval {
503-
t.Errorf("First call stats() = %v, want %v", stats1.reportInterval, wantInterval)
503+
// Verify stats() call calculate the report interval from the time of
504+
// reporter creation.
505+
if got, want := stats1.reportInterval, 5*time.Second; got != want {
506+
t.Errorf("stats1.reportInterval = %v, want %v", stats1.reportInterval, want)
504507
}
505508

506-
// Second call to stats() calculates the report interval from last stats()
507-
// call time.
509+
// Update currentTime to simulate the passage of time between the first
510+
// and second stats() call.
508511
currentTime = currentTime.Add(10 * time.Second)
509512
// Report another dummy drop to ensure stats2 is not nil.
510513
reporter.CallDropped("dummy-category-2")
@@ -513,8 +516,9 @@ func TestStoreReportInterval(t *testing.T) {
513516
if stats2 == nil {
514517
t.Fatalf("stats2 is nil after reporting a drop, want non-nil")
515518
}
516-
wantInterval = 10 * time.Second
517-
if stats2.reportInterval != wantInterval {
518-
t.Errorf("Second call stats() = %v, want %v", stats2.reportInterval, wantInterval)
519+
// Verify stats() call calculate the report interval from the time of first
520+
// stats() call.
521+
if got, want := stats2.reportInterval, 10*time.Second; got != want {
522+
t.Errorf("stats2.reportInterval = %v, want %v", stats2.reportInterval, want)
519523
}
520524
}

xds/internal/clients/lrsclient/loadreport_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e"
3838
"google.golang.org/grpc/xds/internal/clients/internal/testutils/fakeserver"
3939
"google.golang.org/grpc/xds/internal/clients/lrsclient"
40+
lrsclientinternal "google.golang.org/grpc/xds/internal/clients/lrsclient/internal"
4041
"google.golang.org/protobuf/testing/protocmp"
4142
"google.golang.org/protobuf/types/known/durationpb"
4243

@@ -610,3 +611,99 @@ func (s) TestReportLoad_StopWithContext(t *testing.T) {
610611
t.Fatal("Timeout waiting for LRS stream to close")
611612
}
612613
}
614+
615+
// TestReportLoad_LoadReportInterval tests verify that the load report interval
616+
// received by the LRS server is the duration between start of last load
617+
// reporting by the client and the time when the load is reported to the LRS
618+
// server.
619+
func (s) TestReportLoad_LoadReportInterval(t *testing.T) {
620+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
621+
defer cancel()
622+
623+
originalTimeNow := lrsclientinternal.TimeNow
624+
t.Cleanup(func() { lrsclientinternal.TimeNow = originalTimeNow })
625+
626+
// Create a management server that serves LRS.
627+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true})
628+
629+
// Create an LRS client with configuration pointing to the above server.
630+
nodeID := uuid.New().String()
631+
632+
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
633+
config := lrsclient.Config{
634+
Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"},
635+
TransportBuilder: grpctransport.NewBuilder(configs),
636+
}
637+
client, err := lrsclient.New(config)
638+
if err != nil {
639+
t.Fatalf("lrsclient.New() failed: %v", err)
640+
}
641+
642+
// Call the load reporting API, and ensure that an LRS stream is created.
643+
serverIdentifier := clients.ServerIdentifier{ServerURI: mgmtServer.Address, Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}}
644+
loadStore1, err := client.ReportLoad(serverIdentifier)
645+
if err != nil {
646+
t.Fatalf("client.ReportLoad() failed: %v", err)
647+
}
648+
lrsServer := mgmtServer.LRSServer
649+
if _, err := lrsServer.LRSStreamOpenChan.Receive(ctx); err != nil {
650+
t.Fatalf("Timeout when waiting for LRS stream to be created: %v", err)
651+
}
652+
653+
// Initial time for reporter creation
654+
currentTime := time.Now()
655+
lrsclientinternal.TimeNow = func() time.Time {
656+
return currentTime
657+
}
658+
659+
// Report dummy drop to ensure stats is not nil.
660+
loadStore1.ReporterForCluster("cluster1", "eds1").CallDropped("test")
661+
662+
// Update currentTime to simulate the passage of time between the reporter
663+
// creation and first stats() call.
664+
currentTime = currentTime.Add(5 * time.Second)
665+
666+
// Ensure the initial load reporting request is received at the server.
667+
req, err := lrsServer.LRSRequestChan.Receive(ctx)
668+
if err != nil {
669+
t.Fatalf("Timeout when waiting for initial LRS request: %v", err)
670+
}
671+
gotInitialReq := req.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest)
672+
nodeProto := &v3corepb.Node{
673+
Id: nodeID,
674+
UserAgentName: "user-agent",
675+
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "0.0.0.0"},
676+
ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw", "envoy.lrs.supports_send_all_clusters"},
677+
}
678+
wantInitialReq := &v3lrspb.LoadStatsRequest{Node: nodeProto}
679+
if diff := cmp.Diff(gotInitialReq, wantInitialReq, protocmp.Transform()); diff != "" {
680+
t.Fatalf("Unexpected diff in initial LRS request (-got, +want):\n%s", diff)
681+
}
682+
683+
// Send a response from the server with a small deadline.
684+
lrsServer.LRSResponseChan <- &fakeserver.Response{
685+
Resp: &v3lrspb.LoadStatsResponse{
686+
SendAllClusters: true,
687+
LoadReportingInterval: &durationpb.Duration{Nanos: 50000000}, // 50ms
688+
},
689+
}
690+
691+
// Ensure that loads are seen on the server.
692+
req, err = lrsServer.LRSRequestChan.Receive(ctx)
693+
if err != nil {
694+
t.Fatal("Timeout when waiting for LRS request with loads")
695+
}
696+
gotLoad := req.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest).ClusterStats
697+
if l := len(gotLoad); l != 1 {
698+
t.Fatalf("Received load for %d clusters, want 1", l)
699+
}
700+
// Verify load received at LRS server has load report interval calculated
701+
// from the time of reporter creation.
702+
if got, want := gotLoad[0].GetLoadReportInterval().AsDuration(), 5*time.Second; got != want {
703+
t.Errorf("Got load report interval %v, want %v", got, want)
704+
}
705+
706+
ssCtx, ssCancel := context.WithTimeout(context.Background(), time.Millisecond)
707+
defer ssCancel()
708+
loadStore1.Stop(ssCtx)
709+
}

xds/internal/xdsclient/load/store.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import (
2525

2626
const negativeOneUInt64 = ^uint64(0)
2727

28-
// clockNow is used to get the current time. It can be overridden in tests.
29-
var clockNow = time.Now
28+
// timeNow is used to get the current time. It can be overridden in tests.
29+
var timeNow = time.Now
3030

3131
// Store keeps the loads for multiple clusters and services to be reported via
3232
// LRS. It contains loads to reported to one LRS server. Create multiple stores
@@ -120,7 +120,7 @@ func (s *Store) PerCluster(clusterName, serviceName string) PerClusterReporter {
120120
p := &perClusterStore{
121121
cluster: clusterName,
122122
service: serviceName,
123-
lastLoadReportAt: clockNow(),
123+
lastLoadReportAt: timeNow(),
124124
}
125125
c[serviceName] = p
126126
return p
@@ -333,8 +333,8 @@ func (ls *perClusterStore) stats() *Data {
333333
})
334334

335335
ls.mu.Lock()
336-
sd.ReportInterval = clockNow().Sub(ls.lastLoadReportAt)
337-
ls.lastLoadReportAt = clockNow()
336+
sd.ReportInterval = timeNow().Sub(ls.lastLoadReportAt)
337+
ls.lastLoadReportAt = timeNow()
338338
ls.mu.Unlock()
339339

340340
if sd.TotalDrops == 0 && len(sd.Drops) == 0 && len(sd.LocalityStats) == 0 {

xds/internal/xdsclient/load/store_test.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -468,48 +468,51 @@ func TestStoreStatsEmptyDataNotReported(t *testing.T) {
468468
}
469469
}
470470

471-
// TestStoreReportInterval tests that the report interval is correctly
472-
// calculated between consecutive calls to Stats().
471+
// TestStoreReportInterval verify that the load report interval gets
472+
// calculated at every stats() call and is the duration between start of last
473+
// load reporting to next stats() call.
473474
func TestStoreReportInterval(t *testing.T) {
474-
originalClockNow := clockNow
475-
t.Cleanup(func() { clockNow = originalClockNow })
475+
originaltimeNow := timeNow
476+
t.Cleanup(func() { timeNow = originaltimeNow })
476477

477478
// Initial time for reporter creation
478479
currentTime := time.Now()
479-
clockNow = func() time.Time {
480+
timeNow = func() time.Time {
480481
return currentTime
481482
}
482483

483484
store := NewStore()
484485
reporter := store.PerCluster("test-cluster", "test-service")
485-
// To ensure Stats() returns non-nil data, report a dummy drop.
486+
// Report dummy drop to ensure stats1 is not nil.
486487
reporter.CallDropped("dummy-category")
487488

488-
// First call to Stats() calculates the report interval from reporter
489-
// creation time.
489+
// Update currentTime to simulate the passage of time between the reporter
490+
// creation and first stats() call.
490491
currentTime = currentTime.Add(5 * time.Second)
491492
stats1 := store.Stats(nil)
492493

493-
if stats1 == nil {
494-
t.Fatalf("stats1 is nil after reporting a drop, want non-nil")
494+
if len(stats1) == 0 {
495+
t.Fatalf("stats1 is empty after reporting a drop, want non-nil")
495496
}
496-
wantInterval := 5 * time.Second
497-
if stats1[0].ReportInterval != wantInterval {
498-
t.Errorf("First call stats() = %v, want %v", stats1[0].ReportInterval, wantInterval)
497+
// Verify Stats() call calculate the report interval from the time of
498+
// reporter creation.
499+
if got, want := stats1[0].ReportInterval, 5*time.Second; got != want {
500+
t.Errorf("stats1[0].ReportInterval = %v, want %v", stats1[0].ReportInterval, want)
499501
}
500502

501-
// Second call to Stats() calculates the report interval from last Stats()
502-
// call time.
503+
// Update currentTime to simulate the passage of time between the first
504+
// and second stats() call.
503505
currentTime = currentTime.Add(10 * time.Second)
504506
// Report another dummy drop to ensure stats2 is not nil.
505507
reporter.CallDropped("dummy-category-2")
506508
stats2 := store.Stats(nil)
507509

508-
if stats2 == nil {
509-
t.Fatalf("stats2 is nil after reporting a drop, want non-nil")
510+
if len(stats2) == 0 {
511+
t.Fatalf("stats2 is empty after reporting a drop, want non-nil")
510512
}
511-
wantInterval = 10 * time.Second
512-
if stats2[0].ReportInterval != wantInterval {
513-
t.Errorf("Second call stats() = %v, want %v", stats2[0].ReportInterval, wantInterval)
513+
// Verify Stats() call calculate the report interval from the time of first
514+
// Stats() call.
515+
if got, want := stats2[0].ReportInterval, 10*time.Second; got != want {
516+
t.Errorf("stats2[0].ReportInterval = %v, want %v", stats2[0].ReportInterval, want)
514517
}
515518
}

0 commit comments

Comments
 (0)