1
+ import { set } from "lodash" ;
2
+ import Axios from 'axios'
3
+ const defaultConfig = {
4
+ maxRequests : 5 ,
5
+ retryLimit : 5 ,
6
+ retryDelay : 300 ,
7
+ }
8
+
9
+ export function ConcurrencyQueue ( { axios, config} ) {
10
+ if ( ! axios ) {
11
+ throw Error ( 'Axios instance is not present' ) ;
12
+ }
13
+
14
+ if ( config ) {
15
+ if ( config . maxRequests && config . maxRequests <= 0 ) {
16
+ throw Error ( 'Concurrency Manager Error: minimun concurrent requests is 1' ) ;
17
+ } else if ( config . retryLimit && config . retryLimit <= 0 ) {
18
+ throw Error ( 'Retry Policy Error: minimun retry limit is 1' ) ;
19
+ } else if ( config . retryDelay && config . retryDelay < 300 ) {
20
+ throw Error ( 'Retry Policy Error: minimun retry delay for requests is 300' ) ;
21
+ }
22
+ }
23
+
24
+ this . config = Object . assign ( { } , defaultConfig , config )
25
+ this . queue = [ ]
26
+ this . running = [ ]
27
+ this . paused = false
28
+
29
+ // Initial shift will check running request,
30
+ // and adds request to running queue if max requests are not running
31
+ this . initialShift = ( ) => {
32
+ if ( this . running . length < this . config . maxRequests && ! this . paused ) {
33
+ shift ( )
34
+ }
35
+ }
36
+
37
+ // INTERNAL: Shift the queued item to running queue
38
+ let shift = ( ) => {
39
+ if ( this . queue . length && ! this . paused ) {
40
+ const queueItem = this . queue . shift ( )
41
+ queueItem . resolve ( queueItem . request )
42
+ this . running . push ( queueItem )
43
+ }
44
+ }
45
+
46
+ // Append the request at start of queue
47
+ this . unshift = requestPromise => {
48
+ this . queue . unshift ( requestPromise )
49
+ }
50
+
51
+ this . push = requestPromise => {
52
+ this . queue . push ( requestPromise )
53
+ this . initialShift ( )
54
+ }
55
+
56
+ this . clear = ( ) => {
57
+ const requests = this . queue . splice ( 0 , this . queue . length )
58
+ requests . forEach ( ( element ) => {
59
+ element . request . source . cancel ( )
60
+ } )
61
+ }
62
+
63
+ // Detach the interceptors
64
+ this . detach = ( ) => {
65
+ axios . interceptors . request . eject ( this . interceptors . request ) ;
66
+ axios . interceptors . response . eject ( this . interceptors . response ) ;
67
+ this . interceptors = {
68
+ request : null ,
69
+ response : null
70
+ }
71
+ }
72
+
73
+ // Request interseptor to queue the request
74
+ let requestHandler = request => {
75
+ request . retryCount = request . retryCount || 0
76
+ if ( request . headers . authorization && request . headers . authorization !== undefined ) {
77
+ delete request . headers . authtoken
78
+ }
79
+
80
+ if ( request . cancelToken === undefined ) {
81
+ const source = Axios . CancelToken . source ( )
82
+ request . cancelToken = source . token
83
+ request . source = source
84
+ }
85
+
86
+ if ( this . paused && request . retryCount > 0 ) {
87
+ return new Promise ( resolve => {
88
+ this . unshift ( { request, resolve} )
89
+ } )
90
+ } else if ( request . retryCount > 0 ) {
91
+ return request
92
+ }
93
+
94
+ return new Promise ( resolve => {
95
+ this . push ( { request, resolve} )
96
+ } )
97
+ }
98
+
99
+ let delay = ( time ) => {
100
+ if ( ! this . paused ) {
101
+ this . paused = true
102
+ if ( this . running . length > 0 ) {
103
+ setTimeout ( ( ) => {
104
+ delay ( time )
105
+ } , time )
106
+ }
107
+ return new Promise ( resolve => setTimeout ( ( ) => {
108
+ this . paused = false
109
+ for ( let i = 0 ; i < this . config . maxRequests ; i ++ ) {
110
+ this . initialShift ( )
111
+ }
112
+ } , time ) ) ;
113
+ }
114
+ }
115
+
116
+ // Response interceptor used for
117
+ let responseHandler = ( response ) => {
118
+ this . running . shift ( )
119
+ shift ( )
120
+ return response
121
+ }
122
+
123
+ let responseErrorHandler = error => {
124
+ let networkError = error . config . retryCount
125
+ if ( ! this . config . retryOnError || networkError > this . config . retryLimit ) {
126
+ return Promise . reject ( responseHandler ( error ) )
127
+ }
128
+
129
+ // TODO: Error handling
130
+ let wait = this . config . retryDelay
131
+ let retryErrorType = null
132
+ let response = error . response
133
+ if ( ! response ) {
134
+ if ( error . code === 'ECONNABORTED' ) {
135
+ error . response = {
136
+ ...error . response ,
137
+ status : 408 ,
138
+ statusText : `timeout of ${ this . config . timeout } ms exceeded`
139
+ }
140
+ } else {
141
+ return Promise . reject ( responseHandler ( error ) )
142
+ }
143
+ } else if ( response . status === 429 ) {
144
+ retryErrorType = `Error with status: ${ response . status } `
145
+ networkError ++
146
+
147
+ if ( networkError > this . config . retryLimit ) {
148
+ return Promise . reject ( responseHandler ( error ) )
149
+ }
150
+ this . running . shift ( )
151
+ // Cooldown the running requests
152
+ delay ( 1000 )
153
+ error . config . retryCount = networkError
154
+
155
+ return axios ( updateRequestConfig ( error , retryErrorType , wait ) )
156
+ } else if ( this . config . retryCondition && this . config . retryCondition ( error ) ) {
157
+ retryErrorType = `Error with status: ${ response . status } `
158
+ networkError ++
159
+ if ( networkError > this . config . retryLimit ) {
160
+ return Promise . reject ( responseHandler ( error ) )
161
+ }
162
+ if ( this . config . retryDelayOptions ) {
163
+ if ( this . config . retryDelayOptions . customBackoff ) {
164
+ wait = this . config . retryDelayOptions . customBackoff ( networkError , error )
165
+ if ( wait && wait <= 0 ) {
166
+ return Promise . reject ( responseHandler ( error ) )
167
+ }
168
+ } else if ( this . config . retryDelayOptions . base ) {
169
+ wait = this . config . retryDelayOptions . base * networkError
170
+ }
171
+ } else {
172
+ wait = this . config . retryDelay
173
+ }
174
+ error . config . retryCount = networkError
175
+ return new Promise ( function ( resolve ) {
176
+ return setTimeout ( function ( ) {
177
+ return resolve ( axios ( updateRequestConfig ( error , retryErrorType , wait ) ) )
178
+ } , wait )
179
+ } )
180
+ }
181
+
182
+ return Promise . reject ( responseHandler ( error ) )
183
+ }
184
+
185
+ this . interceptors = {
186
+ request : null ,
187
+ response : null
188
+ }
189
+
190
+ let updateRequestConfig = ( error , retryErrorType , wait ) => {
191
+ const requestConfig = error . config
192
+ this . config . logHandler ( 'warning' , `${ retryErrorType } error occurred. Waiting for ${ wait } ms before retrying...` )
193
+ if ( axios !== undefined && axios . defaults !== undefined ) {
194
+ if ( axios . defaults . agent === requestConfig . agent ) {
195
+ delete requestConfig . agent
196
+ }
197
+ if ( axios . defaults . httpAgent === requestConfig . httpAgent ) {
198
+ delete requestConfig . httpAgent
199
+ }
200
+ if ( axios . defaults . httpsAgent === requestConfig . httpsAgent ) {
201
+ delete requestConfig . httpsAgent
202
+ }
203
+ }
204
+
205
+ requestConfig . transformRequest = [ function ( data ) {
206
+ return data
207
+ } ]
208
+ return requestConfig
209
+ }
210
+
211
+ // Adds interseptors in axios to queue request
212
+ this . interceptors . request = axios . interceptors . request . use ( requestHandler )
213
+ this . interceptors . response = axios . interceptors . response . use ( responseHandler , responseErrorHandler )
214
+
215
+ }
0 commit comments