Skip to content

Commit 078715c

Browse files
authored
Support HTTP tunneling on all Upstreams (#10712)
Co-authored-by: changelog-bot <changelog-bot>
1 parent 52298c7 commit 078715c

19 files changed

Lines changed: 941 additions & 280 deletions

File tree

.github/workflows/pr-kubernetes-tests.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,42 +57,49 @@ jobs:
5757
test:
5858
# 2024-12-04: 22m
5959
# 2025-02-13: 29m3s
60+
# 2025-03-24: 23m7s
6061
- cluster-name: 'cluster-one'
6162
go-test-args: '-v -timeout=25m'
62-
go-test-run-regex: '^TestK8sGateway$$/^RouteDelegation$$|^TestGlooctlGlooGatewayEdgeGateway$$|^TestGlooctlK8sGateway$$'
63+
go-test-run-regex: '^TestK8sGateway$$/^RouteDelegation$$|^TestGlooctlGlooGatewayEdgeGateway$$|^TestGlooctlK8sGateway$$|^TestK8sGateway$$/^HTTPTunnel$$'
6364

6465
# 2024-12-04: 23m
6566
# 2025-02-13: 30m30s
67+
# 2025-03-24: 27m42s
6668
- cluster-name: 'cluster-two'
6769
go-test-args: '-v -timeout=25m'
6870
go-test-run-regex: '^TestK8sGatewayIstioRevision$$|^TestRevisionIstioRegression$$|^TestK8sGateway$$/^Deployer$$|^TestK8sGateway$$/^RouteOptions$$|^TestK8sGateway$$/^VirtualHostOptions$$|^TestK8sGateway$$/^Upstreams$$|^TestK8sGateway$$/^HeadlessSvc$$|^TestK8sGateway$$/^PortRouting$$|^TestK8sGatewayMinimalDefaultGatewayParameters$$|^TestK8sGateway$$/^DirectResponse$$|^TestK8sGateway$$/^HttpListenerOptions$$|^TestK8sGateway$$/^ListenerOptions$$|^TestK8sGateway$$/^GlooAdminServer$$'
6971

7072
# 2024-12-04: 24m
7173
# 2025-02-13: 31m49s
74+
# 2025-03-24: 30m26s
7275
- cluster-name: 'cluster-three'
7376
go-test-args: '-v -timeout=30m'
7477
go-test-run-regex: '(^TestK8sGatewayIstioAutoMtls$$|^TestAutomtlsIstioEdgeApisGateway$$|^TestIstioEdgeApiGateway$$|^TestIstioRegression$$)'
7578

7679
# 2024-12-04: 21m
7780
# 2025-02-13: 28m3s
81+
# 2025-03-24: 29m15s
7882
- cluster-name: 'cluster-four'
7983
go-test-args: '-v -timeout=30m'
8084
go-test-run-regex: '(^TestK8sGatewayIstio$$|^TestGlooGatewayEdgeGateway$$|^TestGlooctlIstioInjectEdgeApiGateway$$)'
8185

8286
# 2024-12-04: 24m
8387
# 2025-02-13: 35m21s
88+
# 2025-03-24: 33m39s
8489
- cluster-name: 'cluster-five'
8590
go-test-args: '-v -timeout=30m'
8691
go-test-run-regex: '^TestFullEnvoyValidation$$|^TestValidationStrict$$|^TestValidationAlwaysAccept$$|^TestTransformationValidationDisabled$$'
8792

8893
# 2024-12-04: 26m
8994
# 2025-02-13: 33m19s
95+
# 2025-03-24: 31m38s
9096
- cluster-name: 'cluster-six'
9197
go-test-args: '-v -timeout=30m'
9298
go-test-run-regex: '^TestDiscoveryWatchlabels$$|^TestK8sGatewayNoValidation$$|^TestHelm$$|^TestHelmSettings$$|^TestK8sGatewayAws$$|^TestK8sGateway$$/^HTTPRouteServices$$|^TestK8sGateway$$/^TCPRouteServices$$'
9399

94100
# 2024-12-04: 16m
95101
# 2025-02-13: 26m29s
102+
# 2025-03-24: 29m9s
96103
- cluster-name: 'cluster-seven'
97104
go-test-args: '-v -timeout=25m'
98105
go-test-run-regex: '^TestK8sGateway$$/^CRDCategories$$|^TestK8sGateway$$/^Metrics$$|^TestGloomtlsGatewayEdgeGateway$$|^TestGloomtlsGatewayK8sGateway$$|^TestGlooGatewayEdgeGatewayClearMetrics$$|^TestWatchNamespaceSelector$$|^TestK8sGateway$$/^TLSRouteServices$$'

.github/workflows/pr-unit-tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
kube_gateway_project:
2020
name: projects/gateway2
2121
runs-on: ubuntu-22.04
22-
timeout-minutes: 10
22+
timeout-minutes: 15
2323
# Other unit tests are run by our CloudBuild runner
2424
# These tests do run on Draft PRs, and so we maintain that consistency and run this job on Draft PRs as well
2525
steps:
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
changelog:
2+
- type: FIX
3+
description: |
4+
Updated HTTP tunneling plugin to support HTTP CONNECT tunneling on all Upstreams, not just
5+
those bound to a Route. This allows Upstreams referenced by various settings (e.g. remote JWKS,
6+
tracing, and more) to use a forward proxy without creating a Route for each Upstream.
7+
8+
To address clusters having their own lifecycle that differs from listeners in
9+
gateway2 causing issues with the HTTP tunneling configuration, a new plugin interface
10+
for creating additional clusters and listeners from Upstreams has been added.
11+
The additional clusters and listeners are added to the snapshot near the
12+
end of the gateway2 translations process.
13+
resolvesIssue: false
14+
issueLink: https://github.com/solo-io/solo-projects/issues/7497

projects/gateway2/proxy_syncer/perclient.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55

66
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
7+
envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
78
"github.com/solo-io/gloo/projects/gateway2/krtcollections"
89
ggv2utils "github.com/solo-io/gloo/projects/gateway2/utils"
910
"github.com/solo-io/gloo/projects/gloo/pkg/xds"
@@ -42,31 +43,42 @@ func snapshotPerClient(l *zap.Logger, dbg *krt.DebugHandler, uccCol krt.Collecti
4243
return nil
4344
}
4445

46+
listenersProto := make([]envoycache.Resource, 0)
4547
clustersProto := make([]envoycache.Resource, 0, len(clustersForUcc))
48+
4649
var clustersHash uint64
50+
var listenersHash uint64
51+
52+
// add the clusters and the additional resources created for them
4753
for _, ep := range clustersForUcc {
4854
clustersProto = append(clustersProto, ep.Cluster)
49-
clustersHash ^= ep.ClusterVersion
50-
}
51-
clustersVersion := fmt.Sprintf("%d", clustersHash)
5255

53-
endpointsForUcc := endpoints.FetchEndpointsForClient(kctx, ucc)
54-
endpointsProto := make([]envoycache.Resource, 0, len(endpointsForUcc))
55-
var endpointsHash uint64
56-
for _, ep := range endpointsForUcc {
57-
endpointsProto = append(endpointsProto, ep.Endpoints)
58-
endpointsHash ^= ep.EndpointsHash
56+
// add additional clusters
57+
for _, additionalCluster := range ep.AdditionalClusters {
58+
clustersProto = append(clustersProto, additionalCluster)
59+
}
60+
61+
// add additional listeners
62+
for _, additionalListener := range ep.AdditionalListeners {
63+
listenersProto = append(listenersProto, additionalListener)
64+
}
65+
66+
clustersHash ^= ep.ClusterVersion
67+
clustersHash ^= ep.AdditionalClustersHash
68+
listenersHash ^= ep.AdditionalListenersHash
5969
}
6070

61-
mostlySnap := *maybeMostlySnap
71+
clusterResources := envoycache.NewResources(fmt.Sprintf("%d", clustersHash), clustersProto)
6272

63-
clusterResources := envoycache.NewResources(clustersVersion, clustersProto)
6473
// add missing generated resource from GeneratedResources plugins.
6574
// To be able to do individual upstream translation, We need to redo the GeneratedResources,
6675
// so they don't take as input the entire xds snapshot. the main offender is the tunneling plugin.
6776
//
6877
// for now, a manual audit showed that these only add clusters and listeners. As we don't touch the listeners,
6978
// we just need to account for potentially missing clusters.
79+
//
80+
// This can be removed once we have moved off GeneratedResources in favor of
81+
// UpstreamGeneratedResources which is krt-safe
7082
for name, cluster := range genericSnap.Clusters.Items {
7183
// only copy clusters that don't exist. as we do cluster translation per client,
7284
// our clusters might be slightly different.
@@ -77,13 +89,37 @@ func snapshotPerClient(l *zap.Logger, dbg *krt.DebugHandler, uccCol krt.Collecti
7789
}
7890
clusterResources.Version = fmt.Sprintf("%d", clustersHash)
7991

92+
listenerResources := envoycache.NewResources(fmt.Sprintf("%d", listenersHash), listenersProto)
93+
94+
// add the snapshot listeners
95+
for name, listener := range genericSnap.Listeners.Items {
96+
// only copy listeners that don't exist. as we do cluster translation per client,
97+
// our clusters might be slightly different.
98+
if _, ok := listenerResources.Items[name]; !ok {
99+
listenerResources.Items[name] = listener
100+
listenersHash ^= ggv2utils.HashProto(listener.ResourceProto().(*envoy_config_listener_v3.Listener))
101+
}
102+
}
103+
listenerResources.Version = fmt.Sprintf("%d", listenersHash)
104+
105+
endpointsForUcc := endpoints.FetchEndpointsForClient(kctx, ucc)
106+
endpointsProto := make([]envoycache.Resource, 0, len(endpointsForUcc))
107+
var endpointsHash uint64
108+
for _, ep := range endpointsForUcc {
109+
endpointsProto = append(endpointsProto, ep.Endpoints)
110+
endpointsHash ^= ep.EndpointsHash
111+
}
112+
endpointResources := envoycache.NewResources(fmt.Sprintf("%d-%d", clustersHash, endpointsHash), endpointsProto)
113+
114+
mostlySnap := *maybeMostlySnap
80115
mostlySnap.proxyKey = ucc.ResourceName()
81116
mostlySnap.snap = &xds.EnvoySnapshot{
82117
Clusters: clusterResources,
83-
Endpoints: envoycache.NewResources(fmt.Sprintf("%s-%d", clustersVersion, endpointsHash), endpointsProto),
118+
Listeners: listenerResources,
119+
Endpoints: endpointResources,
84120
Routes: genericSnap.Routes,
85-
Listeners: genericSnap.Listeners,
86121
}
122+
87123
l.Debug("snapshotPerClient", zap.String("proxyKey", mostlySnap.proxyKey),
88124
zap.Stringer("Listeners", resourcesStringer(mostlySnap.snap.Listeners)),
89125
zap.Stringer("Clusters", resourcesStringer(mostlySnap.snap.Clusters)),

projects/gateway2/proxy_syncer/upstreams.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
87
"github.com/solo-io/gloo/pkg/utils/settingsutil"
98
"github.com/solo-io/gloo/projects/gateway2/krtcollections"
109
ggv2utils "github.com/solo-io/gloo/projects/gateway2/utils"
@@ -14,6 +13,7 @@ import (
1413
glookubev1 "github.com/solo-io/gloo/projects/gloo/pkg/api/v1/kube/apis/gloo.solo.io/v1"
1514
"github.com/solo-io/gloo/projects/gloo/pkg/plugins"
1615
"github.com/solo-io/gloo/projects/gloo/pkg/syncer/setup"
16+
"github.com/solo-io/gloo/projects/gloo/pkg/translator"
1717
"github.com/solo-io/go-utils/contextutils"
1818
envoycache "github.com/solo-io/solo-kit/pkg/api/v1/control-plane/cache"
1919
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/resource"
@@ -25,10 +25,15 @@ import (
2525
)
2626

2727
type uccWithCluster struct {
28-
Client krtcollections.UniqlyConnectedClient
29-
Cluster envoycache.Resource
30-
ClusterVersion uint64
31-
upstreamName string
28+
Client krtcollections.UniqlyConnectedClient
29+
Cluster envoycache.Resource
30+
ClusterVersion uint64
31+
AdditionalClusters []envoycache.Resource
32+
AdditionalClustersHash uint64
33+
AdditionalListeners []envoycache.Resource
34+
AdditionalListenersHash uint64
35+
36+
upstreamName string
3237
}
3338

3439
func (c uccWithCluster) ResourceName() string {
@@ -91,19 +96,40 @@ func NewPerClientEnvoyClusters(
9196
latestSnap.Secrets = append(latestSnap.Secrets, s.Inner)
9297
}
9398

94-
c, version := translate(ctx, settings, translator, latestSnap, upstream)
95-
if c == nil {
99+
// TODO make sure that translate returns a nil result if cluster would have been nil previously
100+
clusterResult, version := translate(ctx, settings, translator, latestSnap, upstream)
101+
if clusterResult == nil {
96102
continue
97103
}
104+
105+
c := clusterResult.Cluster
98106
if name != "" && c.GetEdsClusterConfig() != nil {
99107
c.GetEdsClusterConfig().ServiceName = name
100108
}
101109

110+
additionalClusters := make([]envoycache.Resource, 0, len(clusterResult.AdditionalClusters))
111+
additionalClustersHash := uint64(0)
112+
for _, additionalCluster := range clusterResult.AdditionalClusters {
113+
additionalClusters = append(additionalClusters, resource.NewEnvoyResource(additionalCluster))
114+
additionalClustersHash ^= ggv2utils.HashProto(additionalCluster)
115+
}
116+
117+
additionalListeners := make([]envoycache.Resource, 0, len(clusterResult.AdditionalListeners))
118+
additionalListenersHash := uint64(0)
119+
for _, additionalListener := range clusterResult.AdditionalListeners {
120+
additionalListeners = append(additionalListeners, resource.NewEnvoyResource(additionalListener))
121+
additionalListenersHash ^= ggv2utils.HashProto(additionalListener)
122+
}
123+
102124
uccWithClusterRet = append(uccWithClusterRet, uccWithCluster{
103-
Client: ucc,
104-
Cluster: resource.NewEnvoyResource(c),
105-
ClusterVersion: version,
106-
upstreamName: up.ResourceName(),
125+
Client: ucc,
126+
Cluster: resource.NewEnvoyResource(c),
127+
ClusterVersion: version,
128+
upstreamName: up.ResourceName(),
129+
AdditionalClusters: additionalClusters,
130+
AdditionalClustersHash: additionalClustersHash,
131+
AdditionalListeners: additionalListeners,
132+
AdditionalListenersHash: additionalListenersHash,
107133
})
108134
}
109135
return uccWithClusterRet
@@ -118,7 +144,12 @@ func NewPerClientEnvoyClusters(
118144
}
119145
}
120146

121-
func translate(ctx context.Context, settings *gloov1.Settings, translator setup.TranslatorFactory, snap *gloosnapshot.ApiSnapshot, up *gloov1.Upstream) (*envoy_config_cluster_v3.Cluster, uint64) {
147+
func translate(
148+
ctx context.Context, settings *gloov1.Settings,
149+
translator setup.TranslatorFactory,
150+
snap *gloosnapshot.ApiSnapshot,
151+
up *gloov1.Upstream,
152+
) (*translator.ClusterResult, uint64) {
122153
ctx = settingsutil.WithSettings(ctx, settings)
123154

124155
params := plugins.Params{
@@ -129,12 +160,15 @@ func translate(ctx context.Context, settings *gloov1.Settings, translator setup.
129160
}
130161

131162
// false here should be ok - plugins should set eds on eds clusters.
132-
cluster, _ := translator.NewClusterTranslator(ctx, settings).TranslateCluster(params, up)
133-
if cluster == nil {
163+
clusterResult, _ := translator.NewClusterTranslator(ctx, settings).TranslateCluster(params, up)
164+
if clusterResult == nil {
165+
// TODO: handle the reports coming from the translator
166+
// this is a loose end that likely will not need to be resolved before the kgateway rebase
167+
// as the Gloo translator is still used and remitting the reports
134168
return nil, 0
135169
}
136170

137-
return cluster, ggv2utils.HashProto(cluster)
171+
return clusterResult, ggv2utils.HashProto(clusterResult.Cluster)
138172
}
139173

140174
func ApplyDestRulesForUpstream(destrule *DestinationRuleWrapper, u *gloov1.Upstream) (*gloov1.Upstream, string) {

projects/gateway2/setup/ggv2setup_test.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -410,9 +410,12 @@ func (x xdsDumper) Dump(t *testing.T, ctx context.Context) xdsDump {
410410
var clusters []*envoycluster.Cluster
411411
var listeners []*envoylistener.Listener
412412

413-
// run this in parallel with a 5s timeout
413+
// run this in parallel with a 15s timeout
414414
done := make(chan struct{})
415415
go func() {
416+
// give some time to process to reduce flakiness
417+
time.Sleep(5 * time.Second)
418+
416419
defer close(done)
417420
sent := 2
418421
for i := 0; i < sent; i++ {
@@ -432,14 +435,21 @@ func (x xdsDumper) Dump(t *testing.T, ctx context.Context) xdsDump {
432435
}
433436
} else if dresp.GetTypeUrl() == "type.googleapis.com/envoy.config.listener.v3.Listener" {
434437
needMoreListerners := false
438+
435439
for _, anyListener := range dresp.GetResources() {
436440
var listener envoylistener.Listener
437441
if err := anyListener.UnmarshalTo(&listener); err != nil {
438442
t.Errorf("failed to unmarshal listener: %v", err)
439443
}
440444
listeners = append(listeners, &listener)
441-
needMoreListerners = needMoreListerners || (len(getroutesnames(&listener)) == 0)
445+
// ROLDS: Disabling this, it didn't like the pipe listener
446+
// from the upstream-tunneling.yaml test as it expects routes on all listeners.
447+
// It looks like this was put in place to deflake. I've run the tests
448+
// in a loop on my workstation and they consistently passed.
449+
// for i in {1..30}; do go clean -testcache; go test ./projects/gateway2/setup/...; done
450+
//needMoreListerners = needMoreListerners || (len(getRouteNames(&listener)) == 0)
442451
}
452+
443453
if len(listeners) == 0 {
444454
needMoreListerners = true
445455
}
@@ -460,7 +470,7 @@ func (x xdsDumper) Dump(t *testing.T, ctx context.Context) xdsDump {
460470
}()
461471
select {
462472
case <-done:
463-
case <-time.After(5 * time.Second):
473+
case <-time.After(15 * time.Second):
464474
// don't fatal yet as we want to dump the state while still connected
465475
t.Error("timed out waiting for listener/cluster xds dump")
466476
return xdsDump{}
@@ -484,7 +494,7 @@ func (x xdsDumper) Dump(t *testing.T, ctx context.Context) xdsDump {
484494

485495
var routenames []string
486496
for _, l := range listeners {
487-
routenames = append(routenames, getroutesnames(l)...)
497+
routenames = append(routenames, getRouteNames(l)...)
488498
}
489499

490500
dr = proto.Clone(x.dr).(*discovery_v3.DiscoveryRequest)
@@ -790,10 +800,13 @@ func protoJsonRoundTrip(c proto.Message) (any, error) {
790800
return roundtrip, nil
791801
}
792802

793-
func getroutesnames(l *envoylistener.Listener) []string {
803+
func getRouteNames(l *envoylistener.Listener) []string {
794804
var routes []string
795805
for _, fc := range l.GetFilterChains() {
796806
for _, filter := range fc.GetFilters() {
807+
fmt.Printf("filter: %s\n", filter.GetName())
808+
809+
// check if the filter is a http connection manager
797810
suffix := string((&envoyhttp.HttpConnectionManager{}).ProtoReflect().Descriptor().FullName())
798811
if strings.HasSuffix(filter.GetTypedConfig().GetTypeUrl(), suffix) {
799812
var hcm envoyhttp.HttpConnectionManager

0 commit comments

Comments
 (0)