Skip to content

Commit f8d0e69

Browse files
[Linear] Ensure a resource is only serialized/hashed at most once to reply it
Currently the go-control-plane caches (both linear and snapshots) will serialize the resource as many time as there are clients receiving it. This is an issue with control-planes watched by a lot of clients, especially with large resources (e.g. endpoints) This commit ensures that the serialization occurs at most once per resource, in all cases (sotw/delta watches and linear/snapshot cache). A resource will still only be serialized if: - it is returned to at least one client. - its version had to be considered to be returned (i.e. the resource was added again with the same stable version). Signed-off-by: Valerian Roche <[email protected]>
1 parent 023118a commit f8d0e69

File tree

14 files changed

+356
-306
lines changed

14 files changed

+356
-306
lines changed

pkg/cache/v3/cache.go

Lines changed: 143 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cache
1818
import (
1919
"context"
2020
"errors"
21+
"fmt"
2122
"sync/atomic"
2223

2324
"google.golang.org/protobuf/proto"
@@ -179,14 +180,14 @@ type RawResponse struct {
179180
// Proxy responds with this version as an acknowledgement.
180181
Version string
181182

182-
// Resources to be included in the response.
183-
Resources []types.ResourceWithTTL
183+
// resources to be included in the response.
184+
resources []*cachedResource
184185

185-
// ReturnedResources tracks the resources returned for the subscription and the version when it was last returned,
186+
// returnedResources tracks the resources returned for the subscription and the version when it was last returned,
186187
// including previously returned ones when using non-full state resources.
187188
// It allows the cache to know what the client knows. The server will transparently forward this
188189
// across requests, and the cache is responsible for its interpretation.
189-
ReturnedResources map[string]string
190+
returnedResources map[string]string
190191

191192
// Whether this is a heartbeat response. For xDS versions that support TTL, this
192193
// will be converted into a response that doesn't contain the actual resource protobuf.
@@ -198,7 +199,7 @@ type RawResponse struct {
198199
Ctx context.Context
199200

200201
// marshaledResponse holds an atomic reference to the serialized discovery response.
201-
marshaledResponse atomic.Value
202+
marshaledResponse atomic.Pointer[discovery.DiscoveryResponse]
202203
}
203204

204205
// RawDeltaResponse is a pre-serialized xDS response that utilizes the delta discovery request/response objects.
@@ -209,21 +210,24 @@ type RawDeltaResponse struct {
209210
// SystemVersionInfo holds the currently applied response system version and should be used for debugging purposes only.
210211
SystemVersionInfo string
211212

212-
// Resources to be included in the response.
213-
Resources []types.ResourceWithTTL
213+
// resources to be included in the response.
214+
resources []*cachedResource
214215

215-
// RemovedResources is a list of resource aliases which should be dropped by the consuming client.
216-
RemovedResources []string
216+
// removedResources is a list of resource aliases which should be dropped by the consuming client.
217+
removedResources []string
217218

218-
// NextVersionMap consists of updated version mappings after this response is applied.
219-
NextVersionMap map[string]string
219+
// returnedResources tracks the resources returned for the subscription and the version when it was last returned,
220+
// including previously returned ones when using non-full state resources.
221+
// It allows the cache to know what the client knows. The server will transparently forward this
222+
// across requests, and the cache is responsible for its interpretation.
223+
returnedResources map[string]string
220224

221225
// Context provided at the time of response creation. This allows associating additional
222226
// information with a generated response.
223227
Ctx context.Context
224228

225229
// Marshaled Resources to be included in the response.
226-
marshaledResponse atomic.Value
230+
marshaledResponse atomic.Pointer[discovery.DeltaDiscoveryResponse]
227231
}
228232

229233
var (
@@ -267,85 +271,125 @@ var (
267271
_ DeltaResponse = &DeltaPassthroughResponse{}
268272
)
269273

274+
func NewTestRawResponse(req *discovery.DiscoveryRequest, version string, resources []types.ResourceWithTTL) *RawResponse {
275+
cachedRes := []*cachedResource{}
276+
for _, res := range resources {
277+
newRes := newCachedResource(GetResourceName(res.Resource), res.Resource, version)
278+
newRes.ttl = res.TTL
279+
cachedRes = append(cachedRes, newRes)
280+
}
281+
return &RawResponse{
282+
Request: req,
283+
Version: version,
284+
resources: cachedRes,
285+
}
286+
}
287+
288+
func NewTestRawDeltaResponse(req *discovery.DeltaDiscoveryRequest, version string, resources []types.ResourceWithTTL, removedResources []string, nextVersionMap map[string]string) *RawDeltaResponse {
289+
cachedRes := []*cachedResource{}
290+
for _, res := range resources {
291+
name := GetResourceName(res.Resource)
292+
newRes := newCachedResource(name, res.Resource, nextVersionMap[name])
293+
newRes.ttl = res.TTL
294+
cachedRes = append(cachedRes, newRes)
295+
}
296+
return &RawDeltaResponse{
297+
DeltaRequest: req,
298+
SystemVersionInfo: version,
299+
resources: cachedRes,
300+
removedResources: removedResources,
301+
returnedResources: nextVersionMap,
302+
}
303+
}
304+
270305
// GetDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
271306
// This is necessary because the marshaled response does not change across the calls.
272307
// This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load.
273308
func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
274309
marshaledResponse := r.marshaledResponse.Load()
310+
if marshaledResponse != nil {
311+
return marshaledResponse, nil
312+
}
275313

276-
if marshaledResponse == nil {
277-
marshaledResources := make([]*anypb.Any, len(r.Resources))
278-
279-
for i, resource := range r.Resources {
280-
maybeTtldResource, resourceType, err := r.maybeCreateTTLResource(resource)
281-
if err != nil {
282-
return nil, err
283-
}
284-
marshaledResource, err := MarshalResource(maybeTtldResource)
285-
if err != nil {
286-
return nil, err
287-
}
288-
marshaledResources[i] = &anypb.Any{
289-
TypeUrl: resourceType,
290-
Value: marshaledResource,
291-
}
292-
}
293-
294-
marshaledResponse = &discovery.DiscoveryResponse{
295-
VersionInfo: r.Version,
296-
Resources: marshaledResources,
297-
TypeUrl: r.GetRequest().GetTypeUrl(),
314+
marshaledResources := make([]*anypb.Any, 0, len(r.resources))
315+
for _, resource := range r.resources {
316+
marshaledResource, err := r.marshalTTLResource(resource)
317+
if err != nil {
318+
return nil, fmt.Errorf("processing %s: %w", GetResourceName(resource.resource), err)
298319
}
320+
marshaledResources = append(marshaledResources, marshaledResource)
321+
}
299322

300-
r.marshaledResponse.Store(marshaledResponse)
323+
marshaledResponse = &discovery.DiscoveryResponse{
324+
VersionInfo: r.Version,
325+
Resources: marshaledResources,
326+
TypeUrl: r.GetRequest().GetTypeUrl(),
301327
}
328+
r.marshaledResponse.Store(marshaledResponse)
302329

303-
return marshaledResponse.(*discovery.DiscoveryResponse), nil
330+
return marshaledResponse, nil
331+
}
332+
333+
// GetRawResources is used internally within go-control-plane. Its interface and content may change
334+
func (r *RawResponse) GetRawResources() []types.ResourceWithTTL {
335+
resources := make([]types.ResourceWithTTL, 0, len(r.resources))
336+
for _, res := range r.resources {
337+
resources = append(resources, types.ResourceWithTTL{Resource: res.resource, TTL: res.ttl})
338+
}
339+
return resources
304340
}
305341

306342
func (r *RawResponse) GetReturnedResources() map[string]string {
307-
return r.ReturnedResources
343+
return r.returnedResources
308344
}
309345

310346
// GetDeltaDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently.
311347
// We can do this because the marshaled response does not change across the calls.
312348
// This caching behavior is important in high throughput scenarios because grpc marshaling has a cost and it drives the cpu utilization under load.
313349
func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error) {
314350
marshaledResponse := r.marshaledResponse.Load()
351+
if marshaledResponse != nil {
352+
return marshaledResponse, nil
353+
}
315354

316-
if marshaledResponse == nil {
317-
marshaledResources := make([]*discovery.Resource, len(r.Resources))
318-
319-
for i, resource := range r.Resources {
320-
name := GetResourceName(resource.Resource)
321-
marshaledResource, err := MarshalResource(resource.Resource)
322-
if err != nil {
323-
return nil, err
324-
}
325-
version := HashResource(marshaledResource)
326-
if version == "" {
327-
return nil, errors.New("failed to create a resource hash")
328-
}
329-
marshaledResources[i] = &discovery.Resource{
330-
Name: name,
331-
Resource: &anypb.Any{
332-
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
333-
Value: marshaledResource,
334-
},
335-
Version: version,
336-
}
355+
marshaledResources := make([]*discovery.Resource, 0, len(r.resources))
356+
for _, resource := range r.resources {
357+
marshaledResource, err := resource.getMarshaledResource()
358+
if err != nil {
359+
return nil, fmt.Errorf("processing %s: %w", resource.name, err)
337360
}
338-
339-
marshaledResponse = &discovery.DeltaDiscoveryResponse{
340-
Resources: marshaledResources,
341-
RemovedResources: r.RemovedResources,
342-
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
343-
SystemVersionInfo: r.SystemVersionInfo,
361+
version, err := resource.getResourceVersion()
362+
if err != nil {
363+
return nil, fmt.Errorf("processing version of %s: %w", resource.name, err)
344364
}
345-
r.marshaledResponse.Store(marshaledResponse)
365+
marshaledResources = append(marshaledResources, &discovery.Resource{
366+
Name: resource.name,
367+
Resource: &anypb.Any{
368+
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
369+
Value: marshaledResource,
370+
},
371+
Version: version,
372+
})
346373
}
347374

348-
return marshaledResponse.(*discovery.DeltaDiscoveryResponse), nil
375+
marshaledResponse = &discovery.DeltaDiscoveryResponse{
376+
Resources: marshaledResources,
377+
RemovedResources: r.removedResources,
378+
TypeUrl: r.GetDeltaRequest().GetTypeUrl(),
379+
SystemVersionInfo: r.SystemVersionInfo,
380+
}
381+
r.marshaledResponse.Store(marshaledResponse)
382+
383+
return marshaledResponse, nil
384+
}
385+
386+
// GetRawResources is used internally within go-control-plane. Its interface and content may change
387+
func (r *RawDeltaResponse) GetRawResources() []types.ResourceWithTTL {
388+
resources := make([]types.ResourceWithTTL, 0, len(r.resources))
389+
for _, res := range r.resources {
390+
resources = append(resources, types.ResourceWithTTL{Resource: res.resource, TTL: res.ttl})
391+
}
392+
return resources
349393
}
350394

351395
// GetRequest returns the original Discovery Request.
@@ -392,7 +436,7 @@ func (r *RawDeltaResponse) GetNextVersionMap() map[string]string {
392436

393437
// GetReturnedResources returns the version map which consists of updated version mappings after this response is applied.
394438
func (r *RawDeltaResponse) GetReturnedResources() map[string]string {
395-
return r.NextVersionMap
439+
return r.returnedResources
396440
}
397441

398442
func (r *RawDeltaResponse) GetContext() context.Context {
@@ -401,26 +445,44 @@ func (r *RawDeltaResponse) GetContext() context.Context {
401445

402446
var deltaResourceTypeURL = "type.googleapis.com/" + string(proto.MessageName(&discovery.Resource{}))
403447

404-
func (r *RawResponse) maybeCreateTTLResource(resource types.ResourceWithTTL) (types.Resource, string, error) {
405-
if resource.TTL != nil {
406-
wrappedResource := &discovery.Resource{
407-
Name: GetResourceName(resource.Resource),
408-
Ttl: durationpb.New(*resource.TTL),
448+
func (r *RawResponse) marshalTTLResource(resource *cachedResource) (*anypb.Any, error) {
449+
buildResource := func() (*anypb.Any, error) {
450+
marshaled, err := resource.getMarshaledResource()
451+
if err != nil {
452+
return nil, fmt.Errorf("marshaling: %w", err)
409453
}
454+
return &anypb.Any{
455+
TypeUrl: r.GetRequest().GetTypeUrl(),
456+
Value: marshaled,
457+
}, nil
458+
}
410459

411-
if !r.Heartbeat {
412-
rsrc, err := anypb.New(resource.Resource)
413-
if err != nil {
414-
return nil, "", err
415-
}
416-
rsrc.TypeUrl = r.GetRequest().GetTypeUrl()
417-
wrappedResource.Resource = rsrc
460+
if resource.ttl == nil {
461+
return buildResource()
462+
}
463+
464+
wrappedResource := &discovery.Resource{
465+
Name: GetResourceName(resource.resource),
466+
Ttl: durationpb.New(*resource.ttl),
467+
}
468+
469+
if !r.Heartbeat {
470+
rsrc, err := buildResource()
471+
if err != nil {
472+
return nil, err
418473
}
474+
wrappedResource.Resource = rsrc
475+
}
419476

420-
return wrappedResource, deltaResourceTypeURL, nil
477+
marshaled, err := MarshalResource(wrappedResource)
478+
if err != nil {
479+
return nil, fmt.Errorf("marshaling discovery resource: %w", err)
421480
}
422481

423-
return resource.Resource, r.GetRequest().GetTypeUrl(), nil
482+
return &anypb.Any{
483+
TypeUrl: deltaResourceTypeURL,
484+
Value: marshaled,
485+
}, nil
424486
}
425487

426488
// GetDiscoveryResponse returns the final passthrough Discovery Response.

pkg/cache/v3/cache_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package cache_test
1+
package cache
22

33
import (
44
"testing"
@@ -12,7 +12,6 @@ import (
1212
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
1313
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
1414
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
15-
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
1615
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
1716
)
1817

@@ -21,11 +20,11 @@ const (
2120
)
2221

2322
func TestResponseGetDiscoveryResponse(t *testing.T) {
24-
routes := []types.ResourceWithTTL{{Resource: &route.RouteConfiguration{Name: resourceName}}}
25-
resp := cache.RawResponse{
23+
routes := []*cachedResource{newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
24+
resp := RawResponse{
2625
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
2726
Version: "v",
28-
Resources: routes,
27+
resources: routes,
2928
}
3029

3130
discoveryResponse, err := resp.GetDiscoveryResponse()
@@ -52,7 +51,7 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
5251
Resources: []*anypb.Any{rsrc},
5352
VersionInfo: "v",
5453
}
55-
resp := cache.PassthroughResponse{
54+
resp := PassthroughResponse{
5655
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
5756
DiscoveryResponse: dr,
5857
}
@@ -70,11 +69,11 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
7069
}
7170

7271
func TestHeartbeatResponseGetDiscoveryResponse(t *testing.T) {
73-
routes := []types.ResourceWithTTL{{Resource: &route.RouteConfiguration{Name: resourceName}}}
74-
resp := cache.RawResponse{
72+
routes := []*cachedResource{newCachedResource(resourceName, &route.RouteConfiguration{Name: resourceName}, "v")}
73+
resp := RawResponse{
7574
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
7675
Version: "v",
77-
Resources: routes,
76+
resources: routes,
7877
Heartbeat: true,
7978
}
8079

0 commit comments

Comments
 (0)