Skip to content

Commit 5caed12

Browse files
domdom82maxmoehl
authored andcommitted
feat: Enable redispatch to different server on retry
1 parent 2167077 commit 5caed12

File tree

7 files changed

+268
-47
lines changed

7 files changed

+268
-47
lines changed

acceptance-tests/acceptance_tests_suite_test.go

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package acceptance_tests
22

33
import (
44
"context"
5+
"crypto/tls"
56
"encoding/json"
67
"errors"
78
"fmt"
89
"net"
910
"net/http"
11+
"net/http/httptest"
1012
"net/url"
13+
"strconv"
1114
"strings"
1215
"testing"
1316
"time"
@@ -61,46 +64,94 @@ var _ = SynchronizedAfterSuite(func() {
6164
deleteDeployment(deploymentNameForTestNode())
6265
}, func() {})
6366

64-
// Starts a simple test server that returns 200 OK or echoes websocket messages back
65-
func startDefaultTestServer() (func(), int) {
66-
var upgrader = websocket.Upgrader{}
67+
type TestServerOption func(*httptest.Server)
6768

68-
By("Starting a local websocket server to act as a backend")
69-
closeLocalServer, localPort, err := startLocalHTTPServer(nil, func(w http.ResponseWriter, r *http.Request) {
70-
// if no upgrade requested, act like a normal HTTP server
71-
if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
72-
fmt.Fprintln(w, "Hello cloud foundry")
73-
return
74-
}
69+
func withIP(ip string) TestServerOption {
70+
return func(server *httptest.Server) {
71+
l, err := net.Listen("tcp", fmt.Sprintf("%s:0", ip))
72+
Expect(err).ToNot(HaveOccurred())
73+
server.Listener = l
74+
}
75+
}
76+
77+
func withTLS(tlsConfig *tls.Config) TestServerOption {
78+
return func(server *httptest.Server) {
79+
server.TLS = tlsConfig
80+
}
81+
}
82+
83+
func withHandlerFunc(handlerFunc http.HandlerFunc) TestServerOption {
84+
return func(server *httptest.Server) {
85+
server.Config = &http.Server{Handler: handlerFunc}
86+
}
87+
}
88+
89+
func defaultHandler(w http.ResponseWriter, r *http.Request) {
90+
// if no upgrade requested, act like a normal HTTP server
91+
if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
92+
fmt.Fprintln(w, "Hello cloud foundry")
93+
return
94+
}
95+
var upgrader = websocket.Upgrader{}
96+
conn, err := upgrader.Upgrade(w, r, nil)
97+
if err != nil {
98+
w.WriteHeader(http.StatusInternalServerError)
99+
return
100+
}
101+
defer conn.Close()
75102

76-
conn, err := upgrader.Upgrade(w, r, nil)
103+
for {
104+
messageType, message, err := conn.ReadMessage()
77105
if err != nil {
78-
w.WriteHeader(http.StatusInternalServerError)
79-
return
106+
break
80107
}
81-
defer conn.Close()
82-
83-
for {
84-
messageType, message, err := conn.ReadMessage()
85-
if err != nil {
86-
break
87-
}
88-
err = conn.WriteMessage(messageType, message)
89-
if err != nil {
90-
break
91-
}
108+
err = conn.WriteMessage(messageType, message)
109+
if err != nil {
110+
break
92111
}
93-
})
112+
}
113+
}
94114

115+
func newTestServer(options ...TestServerOption) *httptest.Server {
116+
l, err := net.Listen("tcp", "127.0.0.1:0")
95117
Expect(err).NotTo(HaveOccurred())
96-
return closeLocalServer, localPort
118+
119+
server := &httptest.Server{
120+
Listener: l,
121+
EnableHTTP2: false,
122+
TLS: nil,
123+
Config: &http.Server{Handler: http.HandlerFunc(defaultHandler)},
124+
}
125+
126+
for _, opt := range options {
127+
opt(server)
128+
}
129+
130+
return server
97131
}
98132

99-
// Sets up SSH tunnel from HAProxy VM to test server
100-
func setupTunnelFromHaproxyToTestServer(haproxyInfo haproxyInfo, haproxyBackendPort, localPort int) func() {
101-
By(fmt.Sprintf("Creating a reverse SSH tunnel from HAProxy backend (port %d) to local HTTP server (port %d)", haproxyBackendPort, localPort))
133+
// Starts a simple test server that returns 200 OK or echoes websocket messages back
134+
func startDefaultTestServer(options ...TestServerOption) (func(), int) {
135+
By("Starting a local websocket server to act as a backend")
136+
server := newTestServer(options...)
137+
if server.TLS != nil {
138+
server.StartTLS()
139+
} else {
140+
server.Start()
141+
}
142+
143+
serverURL, err := url.Parse(server.URL)
144+
Expect(err).NotTo(HaveOccurred())
145+
port, err := strconv.ParseInt(serverURL.Port(), 10, 64)
146+
Expect(err).NotTo(HaveOccurred())
147+
148+
return server.Close, int(port)
149+
}
150+
151+
func setupTunnelFromHaproxyIPToTestServerIP(haproxyInfo haproxyInfo, haproxyBackendIP string, haproxyBackendPort int, localIP string, localPort int) func() {
152+
By(fmt.Sprintf("Creating a reverse SSH tunnel from HAProxy backend (ip %s port %d) to local HTTP server (ip %s port %d)", haproxyBackendIP, haproxyBackendPort, localIP, localPort))
102153
ctx, cancelFunc := context.WithCancel(context.Background())
103-
err := startReverseSSHPortForwarder(haproxyInfo.SSHUser, haproxyInfo.PublicIP, haproxyInfo.SSHPrivateKey, haproxyBackendPort, localPort, ctx)
154+
err := startReverseSSHPortAndIPForwarder(haproxyInfo.SSHUser, haproxyInfo.PublicIP, haproxyInfo.SSHPrivateKey, haproxyBackendIP, haproxyBackendPort, localIP, localPort, ctx)
104155
Expect(err).NotTo(HaveOccurred())
105156

106157
By("Waiting a few seconds so that HAProxy can detect the backend server is listening")
@@ -112,6 +163,11 @@ func setupTunnelFromHaproxyToTestServer(haproxyInfo haproxyInfo, haproxyBackendP
112163
return cancelFunc
113164
}
114165

166+
// Sets up SSH tunnel from HAProxy VM to test server
167+
func setupTunnelFromHaproxyToTestServer(haproxyInfo haproxyInfo, haproxyBackendPort, localPort int) func() {
168+
return setupTunnelFromHaproxyIPToTestServerIP(haproxyInfo, "127.0.0.1", haproxyBackendPort, "127.0.0.1", localPort)
169+
}
170+
115171
// Sets up SSH tunnel from local machine to HAProxy
116172
func setupTunnelFromLocalMachineToHAProxy(haproxyInfo haproxyInfo, localPort, haproxyPort int) func() {
117173
By(fmt.Sprintf("Creating a SSH tunnel from localmachine (port %d) to HAProxy (port %d)", localPort, haproxyPort))

acceptance-tests/bosh_helpers.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ var opsfileChangeVersion string = `---
4646
version: ((release-version))
4747
`
4848

49-
var opsfileAddSSHUser string = `---
49+
var opsfileConfigureSSH string = `---
5050
# Install OS conf so that we can SSH into VM to inspect configuration
5151
- type: replace
5252
path: /releases/-
@@ -72,10 +72,25 @@ var opsfileAddSSHUser string = `---
7272
value:
7373
name: ssh_key
7474
type: ssh
75+
76+
# Configure SSH to allow port forwarding
77+
- type: replace
78+
path: /instance_groups/name=haproxy/jobs/-
79+
value:
80+
name: post-deploy-script
81+
release: os-conf
82+
properties:
83+
script: |-
84+
#!/bin/bash
85+
sed "/^ *AllowTcpForwarding/d" -i /etc/ssh/sshd_config
86+
echo 'AllowTcpForwarding yes' >> /etc/ssh/sshd_config
87+
sed "/^ *GatewayPorts/d" -i /etc/ssh/sshd_config
88+
echo 'GatewayPorts clientspecified' >> /etc/ssh/sshd_config
89+
/etc/init.d/ssh restart
7590
`
7691

7792
// opsfiles that need to be set for all tests
78-
var defaultOpsfiles = []string{opsfileChangeName, opsfileChangeVersion, opsfileAddSSHUser}
93+
var defaultOpsfiles = []string{opsfileChangeName, opsfileChangeVersion, opsfileConfigureSSH}
7994
var defaultSSHUser string = "ginkgo"
8095

8196
func buildManifestVars(baseManifestVars baseManifestVars, customVars map[string]interface{}) map[string]interface{} {
@@ -126,7 +141,7 @@ func buildHAProxyInfo(baseManifestVars baseManifestVars, varsStoreReader varsSto
126141
// Returns 'info' struct containing public IP and ssh creds, and a callback to deserialize properties from the vars store
127142
// Use expectSuccess with false if the base configuration will not start successfully, e.g. because
128143
// files that are referenced in the configuration still need to be uploaded, or a custom backend needs more time to start up.
129-
// In those cases, `monit` will keep restarting the boshrelease and the test can expect the procesess to be healthy after
144+
// In those cases, `monit` will keep restarting the boshrelease and the test can expect the processes to be healthy after
130145
// the necessary referenced resources are available.
131146
func deployHAProxy(baseManifestVars baseManifestVars, customOpsfiles []string, customVars map[string]interface{}, expectSuccess bool) (haproxyInfo, varsStoreReader) {
132147
manifestVars := buildManifestVars(baseManifestVars, customVars)

acceptance-tests/remote.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,16 @@ func copyFileToRemote(user string, addr string, privateKey string, remotePath st
4848
return scpClient.CopyFile(context.Background(), fileReader, remotePath, permissions)
4949
}
5050

51-
// Forwards a TCP connection from a given port on the local machine to a given port on the remote machine
52-
// Starts in backgound, cancel via context
53-
func startSSHPortForwarder(user string, addr string, privateKey string, localPort, remotePort int, ctx context.Context) error {
51+
// Opens a local port forwarding SSH connection. Equivalent to
52+
// ssh -i <privateKey> -L <localIP>:<localPort>:<remoteIP>:<remotePort> <user>@<addr>
53+
func startSSHPortAndIPForwarder(user string, addr string, privateKey string, localIP string, localPort int, remoteIP string, remotePort int, ctx context.Context) error {
5454
remoteConn, err := buildSSHClient(user, addr, privateKey)
5555
if err != nil {
5656
return err
5757
}
5858

59-
writeLog(fmt.Sprintf("Listening on 127.0.0.1:%d on local machine\n", remotePort))
60-
localListener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", localPort))
59+
writeLog(fmt.Sprintf("Listening on %s:%d on local machine\n", localIP, localPort))
60+
localListener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", localIP, localPort))
6161
if err != nil {
6262
return err
6363
}
@@ -75,9 +75,9 @@ func startSSHPortForwarder(user string, addr string, privateKey string, localPor
7575
return
7676
}
7777

78-
remoteConn, err := remoteConn.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", remotePort))
78+
remoteConn, err := remoteConn.Dial("tcp", fmt.Sprintf("%s:%d", remoteIP, remotePort))
7979
if err != nil {
80-
writeLog(fmt.Sprintf("Error dialing local port %d: %s\n", remotePort, err.Error()))
80+
writeLog(fmt.Sprintf("Error dialing remote ip %s port %d: %s\n", remoteIP, remotePort, err.Error()))
8181
return
8282
}
8383

@@ -95,16 +95,22 @@ func startSSHPortForwarder(user string, addr string, privateKey string, localPor
9595
return nil
9696
}
9797

98-
// Forwards a TCP connection from a given port on the remote machine to a given port on the local machine
99-
// Starts in backgound, cancel via context
100-
func startReverseSSHPortForwarder(user string, addr string, privateKey string, remotePort, localPort int, ctx context.Context) error {
98+
// Forwards a TCP connection from a given port on the local machine to a given port on the remote machine
99+
// Starts in background, cancel via context
100+
func startSSHPortForwarder(user string, addr string, privateKey string, localPort, remotePort int, ctx context.Context) error {
101+
return startSSHPortAndIPForwarder(user, addr, privateKey, "127.0.0.1", localPort, "127.0.0.1", remotePort, ctx)
102+
}
103+
104+
// Opens a remote port forwarding SSH connection. Equivalent to
105+
// ssh -i <privateKey> -R <remoteIP>:<remotePort>:<localIP>:<localPort> <user>@<addr>
106+
func startReverseSSHPortAndIPForwarder(user string, addr string, privateKey string, remoteIP string, remotePort int, localIP string, localPort int, ctx context.Context) error {
101107
remoteConn, err := buildSSHClient(user, addr, privateKey)
102108
if err != nil {
103109
return err
104110
}
105111

106-
writeLog(fmt.Sprintf("Listening on 127.0.0.1:%d on remote machine\n", remotePort))
107-
remoteListener, err := remoteConn.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", remotePort))
112+
writeLog(fmt.Sprintf("Listening on %s:%d on remote machine %s\n", remoteIP, remotePort, addr))
113+
remoteListener, err := remoteConn.Listen("tcp", fmt.Sprintf("%s:%d", remoteIP, remotePort))
108114
if err != nil {
109115
return err
110116
}
@@ -122,9 +128,9 @@ func startReverseSSHPortForwarder(user string, addr string, privateKey string, r
122128
return
123129
}
124130

125-
localConn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", localPort))
131+
localConn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", localIP, localPort))
126132
if err != nil {
127-
writeLog(fmt.Sprintf("Error dialing local port %d: %s\n", localPort, err.Error()))
133+
writeLog(fmt.Sprintf("Error dialing local ip %s port %d: %s\n", localIP, localPort, err.Error()))
128134
return
129135
}
130136

@@ -142,6 +148,12 @@ func startReverseSSHPortForwarder(user string, addr string, privateKey string, r
142148
return nil
143149
}
144150

151+
// Forwards a TCP connection from a given port on the remote machine to a given port on the local machine
152+
// Starts in background, cancel via context
153+
func startReverseSSHPortForwarder(user string, addr string, privateKey string, remotePort, localPort int, ctx context.Context) error {
154+
return startReverseSSHPortAndIPForwarder(user, addr, privateKey, "127.0.0.1", remotePort, "127.0.0.1", localPort, ctx)
155+
}
156+
145157
func copyConnections(client net.Conn, remote net.Conn) {
146158
chDone := make(chan bool)
147159

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package acceptance_tests
2+
3+
import (
4+
"fmt"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
"net/http"
8+
)
9+
10+
var _ = Describe("Retry and Redispatch Tests", func() {
11+
var haproxyInfo haproxyInfo
12+
var closeTunnel []func()
13+
var closeLocalServer []func()
14+
15+
enableRedispatch := false
16+
haproxyBackendPort := 12000
17+
haproxyBackendHealthPort := 8080
18+
opsfileRetry := `---
19+
# Configure Redispatch
20+
- type: replace
21+
path: /instance_groups/name=haproxy/jobs/name=haproxy/properties/ha_proxy/enable_redispatch?
22+
value: ((enable_redispatch))
23+
# Configure Retries
24+
- type: replace
25+
path: /instance_groups/name=haproxy/jobs/name=haproxy/properties/ha_proxy/retries?
26+
value: 2
27+
# Enable backend http health check
28+
- type: replace
29+
path: /instance_groups/name=haproxy/jobs/name=haproxy/properties/ha_proxy/backend_use_http_health?
30+
value: true
31+
`
32+
33+
JustBeforeEach(func() {
34+
haproxyInfo, _ = deployHAProxy(baseManifestVars{
35+
haproxyBackendPort: haproxyBackendPort,
36+
haproxyBackendServers: []string{"127.0.0.1", "127.0.0.2"},
37+
deploymentName: deploymentNameForTestNode(),
38+
}, []string{opsfileRetry}, map[string]interface{}{
39+
"enable_redispatch": enableRedispatch,
40+
}, true)
41+
42+
setupTunnel := func(ip string, backendPort int) {
43+
closeLocalServerFunc, localPort := startDefaultTestServer(withIP(ip))
44+
closeTunnelFunc := setupTunnelFromHaproxyIPToTestServerIP(haproxyInfo, ip, backendPort, ip, localPort)
45+
46+
closeTunnel = append(closeTunnel, closeTunnelFunc)
47+
closeLocalServer = append(closeLocalServer, closeLocalServerFunc)
48+
}
49+
50+
setupTunnel("127.0.0.1", haproxyBackendPort)
51+
setupTunnel("127.0.0.1", haproxyBackendHealthPort)
52+
53+
setupTunnel("127.0.0.2", haproxyBackendHealthPort) // this backend seems healthy but does not respond to traffic
54+
})
55+
56+
AfterEach(func() {
57+
for _, closeLocalServerFunc := range closeLocalServer {
58+
closeLocalServerFunc()
59+
}
60+
for _, closeTunnelFunc := range closeTunnel {
61+
closeTunnelFunc()
62+
}
63+
})
64+
65+
Context("When ha_proxy.enable_redispatch is false (default)", func() {
66+
BeforeEach(func() {
67+
enableRedispatch = false
68+
})
69+
70+
It("Does not redispatch by default", func() {
71+
By("Sending a request to broken backend results in a 503")
72+
73+
Eventually(func() int {
74+
resp, err := http.Get(fmt.Sprintf("http://%s", haproxyInfo.PublicIP))
75+
Expect(err).NotTo(HaveOccurred())
76+
return resp.StatusCode
77+
}).Should(Equal(http.StatusServiceUnavailable))
78+
})
79+
})
80+
Context("When ha_proxy.enable_redispatch is true", func() {
81+
BeforeEach(func() {
82+
enableRedispatch = true
83+
})
84+
85+
It("Does redispatch to other backends", func() {
86+
By("Sending a request to broken backend results in a 200 due to redispatch to working backend")
87+
88+
Consistently(func() int {
89+
resp, err := http.Get(fmt.Sprintf("http://%s", haproxyInfo.PublicIP))
90+
Expect(err).NotTo(HaveOccurred())
91+
return resp.StatusCode
92+
}).Should(Equal(http.StatusOK))
93+
})
94+
})
95+
})

jobs/haproxy/spec

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,12 @@ properties:
258258
ha_proxy.disable_backend_http2_websockets:
259259
default: false
260260
description: "Forward websockets to the backend servers using HTTP/1.1, never HTTP/2. Does not apply to custom routed_backend_servers. Works around https://github.com/cloudfoundry/routing-release/issues/230. Overrides backend_match_http_protocol for websockets."
261+
ha_proxy.enable_redispatch:
262+
default: false
263+
description: "When enabled, HAProxy will try to connect to another server if a connect attempt fails. Best used in conjunction with retries."
264+
ha_proxy.retries:
265+
default: 0
266+
description: "HAProxy will retry this many times on failed connections. When redispatch is enabled, the retries may occur on different servers. In combination with connect_timeout this defines the maximum response time of HAProxy to clients. e.g. 0.5s connect_timeout * 10 retries = 5s max response time"
261267

262268
ha_proxy.connect_timeout:
263269
description: "Timeout (in floating point seconds) used on connections from haproxy to a backend, while waiting for the TCP handshake to complete + connection to establish"

0 commit comments

Comments
 (0)