Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit ad53edf

Browse files
authored
Merge pull request #236 from yuce/233-request-retrials
Initial retry support
2 parents 7dd81a2 + 43f3268 commit ad53edf

File tree

3 files changed

+117
-48
lines changed

3 files changed

+117
-48
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Change Log
22

3+
* **next**
4+
* The client retries failed HTTP requests, by default 2 times. Use `OptClientRetries` to set the number of retries.
5+
36
* **v1.3.0** (2019-04-19)
47
* **Compatible with Pilosa 1.2 and 1.3**
58
* Deprecated `QueryResponse.Columns` function. Use `QueryResponse.ColumnAttrs` function instead.

client.go

+72-12
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ type Client struct {
8282
manualFragmentNode *fragmentNode
8383
manualServerURI *URI
8484
tracer opentracing.Tracer
85+
// Number of retries if an HTTP request fails
86+
retries int
87+
minRetrySleepTime time.Duration
88+
maxRetrySleepTime time.Duration
8589

8690
importLogEncoder encoder
8791
logLock sync.Mutex
@@ -143,6 +147,9 @@ func newClientWithOptions(options *ClientOptions) *Client {
143147
} else {
144148
c.tracer = options.tracer
145149
}
150+
c.retries = *options.retries
151+
c.minRetrySleepTime = 1 * time.Second
152+
c.maxRetrySleepTime = 2 * time.Minute
146153
c.importManager = newRecordImportManager(c)
147154
return c
148155

@@ -591,7 +598,11 @@ func (c *Client) hasRoaringImportSupport(field *Field) bool {
591598
}
592599
// Check whether the roaring import endpoint exists
593600
path := makeRoaringImportPath(field, 0, url.Values{})
601+
// err may contain an HTTP error, but we don't use it.
594602
resp, _, _ := c.httpRequest("GET", path, nil, nil, false)
603+
if resp == nil {
604+
return false
605+
}
595606
if resp.StatusCode == http.StatusMethodNotAllowed || resp.StatusCode == http.StatusOK {
596607
// Roaring import endpoint exists
597608
return true
@@ -680,7 +691,7 @@ func (c *Client) fetchCoordinatorNode() (fragmentNode, error) {
680691
}
681692

682693
func (c *Client) importData(uri *URI, path string, data []byte) error {
683-
resp, err := c.doRequest(uri, "POST", path, defaultProtobufHeaders(), bytes.NewReader(data))
694+
resp, err := c.doRequest(uri, "POST", path, defaultProtobufHeaders(), data)
684695
if err = anyError(resp, err); err != nil {
685696
return errors.Wrap(err, "doing import")
686697
}
@@ -716,7 +727,7 @@ func (c *Client) importRoaringBitmap(uri *URI, field *Field, shard uint64, views
716727

717728
c.logImport(field.index.Name(), path, shard, true, data)
718729

719-
resp, err := c.doRequest(uri, "POST", path, defaultProtobufHeaders(), bytes.NewReader(data))
730+
resp, err := c.doRequest(uri, "POST", path, defaultProtobufHeaders(), data)
720731
if err = anyError(resp, err); err != nil {
721732
return errors.Wrap(err, "doing import")
722733
}
@@ -835,7 +846,7 @@ func (c *Client) httpRequest(method string, path string, data []byte, headers ma
835846
if err != nil {
836847
return nil, nil, err
837848
}
838-
response, err = c.doRequest(host, method, path, c.augmentHeaders(headers), bytes.NewReader(data))
849+
response, err = c.doRequest(host, method, path, c.augmentHeaders(headers), data)
839850
if err == nil {
840851
break
841852
}
@@ -925,12 +936,45 @@ func anyError(resp *http.Response, err error) error {
925936
}
926937

927938
// doRequest creates and performs an http request.
928-
func (c *Client) doRequest(host *URI, method, path string, headers map[string]string, reader io.Reader) (*http.Response, error) {
929-
req, err := makeRequest(host, method, path, headers, reader)
930-
if err != nil {
931-
return nil, errors.Wrap(err, "building request")
939+
func (c *Client) doRequest(host *URI, method, path string, headers map[string]string, data []byte) (*http.Response, error) {
940+
var resp *http.Response
941+
var err error
942+
var req *http.Request
943+
var content []byte
944+
945+
tries := 1 + c.retries
946+
sleepTime := c.minRetrySleepTime
947+
for tries > 0 {
948+
req, err = makeRequest(host, method, path, headers, data)
949+
if err != nil {
950+
return nil, errors.Wrap(err, "building request")
951+
}
952+
tries--
953+
resp, err = c.client.Do(req)
954+
if err == nil {
955+
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
956+
return resp, nil
957+
}
958+
// Pilosa nodes sometimes return 400, we retry in that case.
959+
// No need to retry in other 4xx cases.
960+
if resp.StatusCode > 400 && resp.StatusCode < 500 {
961+
return resp, nil
962+
}
963+
content, err = ioutil.ReadAll(resp.Body)
964+
resp.Body.Close()
965+
if err != nil {
966+
return nil, err
967+
}
968+
err = errors.New(strings.TrimSpace(string(content)))
969+
}
970+
c.logger.Printf("request failed with: %s, retrying (%d)", err.Error(), tries)
971+
time.Sleep(sleepTime)
972+
sleepTime *= 2
973+
if sleepTime > c.maxRetrySleepTime {
974+
sleepTime = c.maxRetrySleepTime
975+
}
932976
}
933-
return c.client.Do(req)
977+
return nil, errors.Wrap(err, "max retries exceeded")
934978
}
935979

936980
// statusToNodeShardsForIndex finds the hosts which contains shards for the given index
@@ -1058,7 +1102,7 @@ func (c *Client) ExperimentalReplayImport(r io.Reader, concurrency int) error {
10581102

10591103
if !log.IsRoaring {
10601104
for _, node := range nodes {
1061-
resp, err := c.doRequest(node.URI(), "POST", log.Path, defaultProtobufHeaders(), bytes.NewReader(log.Data))
1105+
resp, err := c.doRequest(node.URI(), "POST", log.Path, defaultProtobufHeaders(), log.Data)
10621106
if err = anyError(resp, err); err != nil {
10631107
return errors.Wrap(err, "doing import")
10641108
}
@@ -1067,7 +1111,7 @@ func (c *Client) ExperimentalReplayImport(r io.Reader, concurrency int) error {
10671111
} else {
10681112
// import-roaring forwards on to all replicas, so we only import to
10691113
// one node.
1070-
resp, err := c.doRequest(nodes[0].URI(), "POST", log.Path, defaultProtobufHeaders(), bytes.NewReader(log.Data))
1114+
resp, err := c.doRequest(nodes[0].URI(), "POST", log.Path, defaultProtobufHeaders(), log.Data)
10711115
if err = anyError(resp, err); err != nil {
10721116
return errors.Wrap(err, "doing import")
10731117
}
@@ -1121,8 +1165,8 @@ func defaultProtobufHeaders() map[string]string {
11211165
}
11221166
}
11231167

1124-
func makeRequest(host *URI, method, path string, headers map[string]string, reader io.Reader) (*http.Request, error) {
1125-
request, err := http.NewRequest(method, host.Normalize()+path, reader)
1168+
func makeRequest(host *URI, method, path string, headers map[string]string, data []byte) (*http.Request, error) {
1169+
request, err := http.NewRequest(method, host.Normalize()+path, bytes.NewReader(data))
11261170
if err != nil {
11271171
return nil, err
11281172
}
@@ -1326,6 +1370,7 @@ type ClientOptions struct {
13261370
TLSConfig *tls.Config
13271371
manualServerAddress bool
13281372
tracer opentracing.Tracer
1373+
retries *int
13291374

13301375
importLogWriter io.Writer
13311376
}
@@ -1410,6 +1455,17 @@ func OptClientTracer(tracer opentracing.Tracer) ClientOption {
14101455
}
14111456
}
14121457

1458+
// OptClientRetries sets the number of retries on HTTP request failures.
1459+
func OptClientRetries(retries int) ClientOption {
1460+
return func(options *ClientOptions) error {
1461+
if retries < 0 {
1462+
return errors.New("retries must be non-negative")
1463+
}
1464+
options.retries = &retries
1465+
return nil
1466+
}
1467+
}
1468+
14131469
type versionInfo struct {
14141470
Version string `json:"version"`
14151471
}
@@ -1434,6 +1490,10 @@ func (co *ClientOptions) withDefaults() (updated *ClientOptions) {
14341490
if updated.TLSConfig == nil {
14351491
updated.TLSConfig = &tls.Config{}
14361492
}
1493+
if updated.retries == nil {
1494+
retries := 2
1495+
updated.retries = &retries
1496+
}
14371497
return
14381498
}
14391499

0 commit comments

Comments
 (0)