Skip to content

Commit 2c8aa41

Browse files
[V2] Add support for MaglevHashEndpoints (#530)
1 parent b6eca95 commit 2c8aa41

11 files changed

Lines changed: 406 additions & 105 deletions

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
- [V2] Add optional status code checks. Consistent return of response
99
- [V2] JavaScript Transactions API
1010
- [V2] Async Client
11+
- [V2] Fix connection.NewRequestWithEndpoint()
12+
- [V2] Add support for MaglevHashEndpoints
1113

1214
## [1.6.0](https://github.com/arangodb/go-driver/tree/v1.6.0) (2023-05-30)
1315
- Add ErrArangoDatabaseNotFound and IsExternalStorageError helper to v2

v2/connection/connection_http_internal.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -128,34 +128,31 @@ func (j *httpConnection) SetEndpoint(e Endpoint) error {
128128
return nil
129129
}
130130

131-
func (j *httpConnection) NewRequestWithEndpoint(endpoint string, method string, urls ...string) (Request, error) {
132-
return j.newRequestWithEndpoint(endpoint, method, urls...)
131+
func (j *httpConnection) NewRequestWithEndpoint(endpoint string, method string, urlParts ...string) (Request, error) {
132+
return j.newRequestWithEndpoint(endpoint, method, urlParts...)
133133
}
134134

135-
func (j *httpConnection) NewRequest(method string, urls ...string) (Request, error) {
136-
return j.newRequest(method, urls...)
135+
func (j *httpConnection) NewRequest(method string, urlParts ...string) (Request, error) {
136+
return j.newRequestWithEndpoint("", method, urlParts...)
137137
}
138138

139-
func (j *httpConnection) newRequest(method string, urls ...string) (*httpRequest, error) {
140-
return j.newRequestWithEndpoint("", method, urls...)
141-
}
139+
func (j *httpConnection) newRequestWithEndpoint(endpoint string, method string, urlParts ...string) (*httpRequest, error) {
140+
urlPath := path.Join(urlParts...)
142141

143-
func (j *httpConnection) newRequestWithEndpoint(endpoint string, method string, urls ...string) (*httpRequest, error) {
144-
e, ok := j.endpoint.Get(endpoint)
145-
if !ok {
146-
return nil, errors.Errorf("Unable to resolve endpoint for %s", e)
142+
e, err := j.endpoint.Get(endpoint, method, urlPath)
143+
if err != nil {
144+
return nil, errors.Errorf("Unable to resolve endpoint for %s", endpoint)
147145
}
148-
url, err := url.Parse(e)
146+
u, err := url.Parse(e)
149147
if err != nil {
150148
return nil, err
151149
}
152-
153-
url.Path = path.Join(url.Path, path.Join(urls...))
150+
u.Path = path.Join(u.Path, urlPath)
154151

155152
r := &httpRequest{
156153
method: method,
157-
url: url,
158-
endpoint: endpoint,
154+
url: u,
155+
endpoint: e,
159156
}
160157

161158
return r, nil

v2/connection/connection_http_internal_test.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// DISCLAIMER
33
//
4-
// Copyright 2021 ArangoDB GmbH, Cologne, Germany
4+
// Copyright 2021-2023 ArangoDB GmbH, Cologne, Germany
55
//
66
// Licensed under the Apache License, Version 2.0 (the "License");
77
// you may not use this file except in compliance with the License.
@@ -17,12 +17,12 @@
1717
//
1818
// Copyright holder is ArangoDB GmbH, Cologne, Germany
1919
//
20-
// Author Tomasz Mielech
21-
//
2220

2321
package connection
2422

2523
import (
24+
"net/http"
25+
"strings"
2626
"testing"
2727

2828
"github.com/stretchr/testify/assert"
@@ -79,3 +79,40 @@ func Test_httpConnection_Decoder(t *testing.T) {
7979
})
8080
}
8181
}
82+
83+
func Test_httpConnection_NewRequest(t *testing.T) {
84+
eps := []string{
85+
"https://a:8529", "https://a:8539", "https://b:8529",
86+
}
87+
88+
c := httpConnection{
89+
endpoint: NewRoundRobinEndpoints(eps),
90+
}
91+
92+
j := 0
93+
for i := 0; i < 10; i++ {
94+
expectedEp := eps[j]
95+
req, err := c.NewRequest(http.MethodGet, "_api/version")
96+
require.NoError(t, err)
97+
require.Equal(t, expectedEp, req.Endpoint())
98+
require.True(t, strings.HasPrefix(req.URL(), expectedEp))
99+
j++
100+
if j >= len(eps) {
101+
j = 0
102+
}
103+
}
104+
}
105+
106+
func Test_httpConnection_NewRequestWithEndpoint(t *testing.T) {
107+
c := httpConnection{
108+
endpoint: NewRoundRobinEndpoints([]string{"https://a:8529", "https://a:8539", "https://b:8529"}),
109+
}
110+
111+
for i := 0; i < 10; i++ {
112+
ep := "https://a:8539"
113+
req, err := c.NewRequestWithEndpoint(ep, http.MethodGet, "_api/version")
114+
require.NoError(t, err)
115+
require.Equal(t, ep, req.Endpoint())
116+
require.True(t, strings.HasPrefix(req.URL(), ep))
117+
}
118+
}

v2/connection/endpoint.go

Lines changed: 19 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//
22
// DISCLAIMER
33
//
4-
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
4+
// Copyright 2020-2023 ArangoDB GmbH, Cologne, Germany
55
//
66
// Licensed under the Apache License, Version 2.0 (the "License");
77
// you may not use this file except in compliance with the License.
@@ -17,55 +17,30 @@
1717
//
1818
// Copyright holder is ArangoDB GmbH, Cologne, Germany
1919
//
20-
// Author Adam Janikowski
21-
//
2220

2321
package connection
2422

25-
import "sync"
23+
import (
24+
"strings"
25+
)
2626

2727
type Endpoint interface {
28-
// Get return one of endpoints if is valid, if no default one is returned
29-
Get(endpoints ...string) (string, bool)
30-
28+
// Get returns provided endpoint if it is known, otherwise chooses one endpoint from existing list
29+
// Endpoint implementation might use the Request method and path values to determine which endpoint to return
30+
Get(endpoint, method, path string) (string, error)
31+
// List returns known endpoints
3132
List() []string
3233
}
3334

34-
func NewEndpoints(e ...string) Endpoint {
35-
return &endpoints{
36-
endpoints: e,
37-
}
38-
}
39-
40-
type endpoints struct {
41-
lock sync.Mutex
42-
43-
endpoints []string
44-
45-
index int
46-
}
47-
48-
func (e *endpoints) List() []string {
49-
return e.endpoints
50-
}
51-
52-
func (e *endpoints) Get(endpoints ...string) (string, bool) {
53-
e.lock.Lock()
54-
defer e.lock.Unlock()
55-
56-
if len(e.endpoints) == 0 {
57-
return "", false
58-
}
59-
60-
//return e.endpoints[0], true
61-
62-
if e.index >= len(e.endpoints) {
63-
e.index = 0
64-
}
65-
66-
r := e.endpoints[e.index]
67-
68-
e.index++
69-
70-
return r, true
35+
var (
36+
urlFixer = strings.NewReplacer(
37+
"tcp://", "http://",
38+
"ssl://", "https://",
39+
)
40+
)
41+
42+
// FixupEndpointURLScheme changes endpoint URL schemes used by arangod to ones used by go.
43+
// E.g. "tcp://localhost:8529" -> "http://localhost:8529"
44+
func FixupEndpointURLScheme(u string) string {
45+
return urlFixer.Replace(u)
7146
}

v2/connection/endpoints.go

Lines changed: 0 additions & 36 deletions
This file was deleted.
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2020-2023 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package connection
22+
23+
import (
24+
"fmt"
25+
"math/big"
26+
"sort"
27+
"strings"
28+
29+
"github.com/kkdai/maglev"
30+
"github.com/pkg/errors"
31+
)
32+
33+
// RequestHashValueExtractor accepts request method and full request path and must return a value which will be used for hash calculation
34+
type RequestHashValueExtractor func(requestMethod, requestPath string) (string, error)
35+
36+
// NewMaglevHashEndpoints returns Endpoint manager which runs round-robin
37+
func NewMaglevHashEndpoints(eps []string, extractor RequestHashValueExtractor) (Endpoint, error) {
38+
// order of endpoints affects hashing result
39+
sort.Strings(eps)
40+
41+
// lookupSize must be equal or greater than len(eps) and it must be a prime number
42+
lookupSize := findNextPrime(uint64(len(eps)))
43+
44+
table, err := maglev.NewMaglev(eps, lookupSize)
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
return &maglevHashEndpoints{
50+
extractor: extractor,
51+
endpoints: eps,
52+
maglevTable: table,
53+
}, nil
54+
}
55+
56+
func findNextPrime(i uint64) uint64 {
57+
bigInt := big.NewInt(0).SetUint64(i)
58+
59+
for {
60+
if bigInt.ProbablyPrime(1) {
61+
return bigInt.Uint64()
62+
}
63+
i++
64+
bigInt.SetUint64(i)
65+
}
66+
}
67+
68+
type maglevHashEndpoints struct {
69+
extractor RequestHashValueExtractor
70+
endpoints []string
71+
maglevTable *maglev.Maglev
72+
}
73+
74+
func (e *maglevHashEndpoints) List() []string {
75+
return e.endpoints
76+
}
77+
78+
func (e *maglevHashEndpoints) Get(providedEp, requestMethod, requestPath string) (string, error) {
79+
if len(e.endpoints) == 0 {
80+
return "", errors.New("no endpoints known")
81+
}
82+
83+
for _, known := range e.endpoints {
84+
if known == providedEp {
85+
return known, nil
86+
}
87+
}
88+
89+
val, err := e.extractor(requestMethod, requestPath)
90+
if err != nil {
91+
return "", errors.WithMessagef(err, "could not extract value for method '%s' path '%s'", requestMethod, requestPath)
92+
}
93+
94+
r, err := e.maglevTable.Get(val)
95+
if err != nil {
96+
return r, errors.WithMessage(err, "failed to lookup Maglev table")
97+
}
98+
99+
return r, nil
100+
}
101+
102+
var _ RequestHashValueExtractor = RequestDBNameValueExtractor
103+
104+
// RequestDBNameValueExtractor might be used as RequestHashValueExtractor to use DB name from URL for hashing
105+
// It fallbacks to requestMethod+requestPath concatenation in case if path does not contain DB name
106+
func RequestDBNameValueExtractor(requestMethod, requestPath string) (string, error) {
107+
// most go-driver requests to ArangoDB are executed against `_db/<db-name>/xxxx/yyy/`) URL pattern
108+
// we can try to extract db-name to load-balance requests between endpoints
109+
110+
parts := strings.Split(strings.Trim(strings.TrimSpace(requestPath), "/"), "/")
111+
if len(parts) >= 3 {
112+
if parts[0] == "_db" {
113+
return parts[1], nil
114+
}
115+
}
116+
return fmt.Sprintf("%s_%s", requestMethod, requestPath), nil
117+
}

0 commit comments

Comments
 (0)