@@ -48,34 +48,22 @@ internal final class ConnectionPool {
48
48
/// The event loop we're on.
49
49
private let loop : EventLoop
50
50
51
- /// The exponential backoff factor for connection attempts.
52
- internal let backoffFactor : Float32
53
-
54
- /// The initial delay for backing off a reconnection attempt.
55
- internal let initialBackoffDelay : TimeAmount
56
-
57
- /// The maximum number of connections the pool will preserve. Additional connections will be made available
58
- /// past this limit if `leaky` is set to `true`, but they will not be persisted in the pool once used.
59
- internal let maximumConnectionCount : Int
51
+ /// The strategy to use for finding and returning connections when requested.
52
+ internal let connectionRetryStrategy : RedisConnectionPool . PoolConnectionRetryStrategy
60
53
61
54
/// The minimum number of connections the pool will keep alive. If a connection is disconnected while in the
62
55
/// pool such that the number of connections drops below this number, the connection will be re-established.
63
56
internal let minimumConnectionCount : Int
57
+ /// The maximum number of connections the pool will preserve.
58
+ internal let maximumConnectionCount : Int
59
+ /// The behavior to use for allowing or denying additional connections past the max connection count.
60
+ internal let maxConnectionCountBehavior : RedisConnectionPool . ConnectionCountBehavior . MaxConnectionBehavior
64
61
65
62
/// The number of connection attempts currently outstanding.
66
63
private var pendingConnectionCount : Int
67
-
68
64
/// The number of connections that have been handed out to users and are in active use.
69
65
private( set) var leasedConnectionCount : Int
70
66
71
- /// Whether this connection pool is "leaky".
72
- ///
73
- /// The difference between a leaky and non-leaky connection pool is their behaviour when the pool is currently
74
- /// entirely in-use. For a leaky pool, if a connection is requested and none are available, a new connection attempt
75
- /// will be made and the connection will be passed to the user. For a non-leaky pool, the user will wait for a connection
76
- /// to be returned to the pool.
77
- internal let leaky : Bool
78
-
79
67
/// The current state of this connection pool.
80
68
private var state : State
81
69
@@ -85,49 +73,48 @@ internal final class ConnectionPool {
85
73
return self . availableConnections. count + self . pendingConnectionCount + self . leasedConnectionCount
86
74
}
87
75
88
- /// Whether a connection can be added into the availableConnections pool when it's returned. This is true
89
- /// for non-leaky pools if the sum of availableConnections and leased connections is less than max connections,
90
- /// and for leaky pools if the number of availableConnections is less than max connections (as we went to all
91
- /// the effort to create the connection, we may as well keep it).
92
- /// Note that this means connection attempts in flight may not be used for anything. This is ok!
76
+ /// Whether a connection can be added into the availableConnections pool when it's returned.
93
77
private var canAddConnectionToPool : Bool {
94
- if self . leaky {
78
+ switch self . maxConnectionCountBehavior {
79
+ // only if the current available count is less than the max
80
+ case . elastic:
95
81
return self . availableConnections. count < self . maximumConnectionCount
96
- } else {
82
+
83
+ // only if the total connections count is less than the max
84
+ case . strict:
97
85
return ( self . availableConnections. count + self . leasedConnectionCount) < self . maximumConnectionCount
98
86
}
99
87
}
100
88
101
89
internal init (
102
- maximumConnectionCount: Int ,
103
90
minimumConnectionCount: Int ,
104
- leaky: Bool ,
91
+ maximumConnectionCount: Int ,
92
+ maxConnectionCountBehavior: RedisConnectionPool . ConnectionCountBehavior . MaxConnectionBehavior ,
93
+ connectionRetryStrategy: RedisConnectionPool . PoolConnectionRetryStrategy ,
105
94
loop: EventLoop ,
106
95
poolLogger: Logger ,
107
- connectionBackoffFactor: Float32 = 2 ,
108
- initialConnectionBackoffDelay: TimeAmount = . milliseconds( 100 ) ,
109
96
connectionFactory: @escaping ( EventLoop ) -> EventLoopFuture < RedisConnection >
110
97
) {
111
- guard minimumConnectionCount <= maximumConnectionCount else {
98
+ self . minimumConnectionCount = minimumConnectionCount
99
+ self . maximumConnectionCount = maximumConnectionCount
100
+ self . maxConnectionCountBehavior = maxConnectionCountBehavior
101
+
102
+ guard self . minimumConnectionCount <= self . maximumConnectionCount else {
112
103
poolLogger. critical ( " pool's minimum connection count is higher than the maximum " )
113
- preconditionFailure ( " Minimum connection count must not exceed maximum" )
104
+ preconditionFailure ( " minimum connection count must not exceed maximum" )
114
105
}
115
106
116
- self . connectionFactory = connectionFactory
107
+ self . pendingConnectionCount = 0
108
+ self . leasedConnectionCount = 0
117
109
self . availableConnections = [ ]
118
- self . availableConnections. reserveCapacity ( maximumConnectionCount)
110
+ self . availableConnections. reserveCapacity ( self . maximumConnectionCount)
119
111
120
112
// 8 is a good number to skip the first few buffer resizings
121
113
self . connectionWaiters = CircularBuffer ( initialCapacity: 8 )
122
114
self . loop = loop
123
- self . backoffFactor = connectionBackoffFactor
124
- self . initialBackoffDelay = initialConnectionBackoffDelay
115
+ self . connectionFactory = connectionFactory
116
+ self . connectionRetryStrategy = connectionRetryStrategy
125
117
126
- self . maximumConnectionCount = maximumConnectionCount
127
- self . minimumConnectionCount = minimumConnectionCount
128
- self . pendingConnectionCount = 0
129
- self . leasedConnectionCount = 0
130
- self . leaky = leaky
131
118
self . state = . active
132
119
}
133
120
@@ -154,12 +141,13 @@ internal final class ConnectionPool {
154
141
}
155
142
}
156
143
157
- func leaseConnection( deadline: NIODeadline , logger: Logger ) -> EventLoopFuture < RedisConnection > {
144
+ func leaseConnection( logger: Logger , deadline: NIODeadline ? = nil ) -> EventLoopFuture < RedisConnection > {
145
+ let deadline = deadline ?? . now( ) + self . connectionRetryStrategy. timeout
158
146
if self . loop. inEventLoop {
159
- return self . _leaseConnection ( deadline , logger: logger)
147
+ return self . _leaseConnection ( logger: logger, deadline : deadline )
160
148
} else {
161
149
return self . loop. flatSubmit {
162
- return self . _leaseConnection ( deadline , logger: logger)
150
+ return self . _leaseConnection ( logger: logger, deadline : deadline )
163
151
}
164
152
}
165
153
}
@@ -191,12 +179,16 @@ extension ConnectionPool {
191
179
RedisLogging . MetadataKeys. connectionCount: " \( neededConnections) "
192
180
] )
193
181
while neededConnections > 0 {
194
- self . _createConnection ( backoff: self . initialBackoffDelay, startIn: . nanoseconds( 0 ) , logger: logger)
182
+ self . _createConnection (
183
+ retryDelay: self . connectionRetryStrategy. initialDelay,
184
+ startIn: . nanoseconds( 0 ) ,
185
+ logger: logger
186
+ )
195
187
neededConnections -= 1
196
188
}
197
189
}
198
190
199
- private func _createConnection( backoff : TimeAmount , startIn delay: TimeAmount , logger: Logger ) {
191
+ private func _createConnection( retryDelay : TimeAmount , startIn delay: TimeAmount , logger: Logger ) {
200
192
self . loop. assertInEventLoop ( )
201
193
self . pendingConnectionCount += 1
202
194
@@ -212,7 +204,7 @@ extension ConnectionPool {
212
204
self . connectionCreationSucceeded ( connection, logger: logger)
213
205
214
206
case . failure( let error) :
215
- self . connectionCreationFailed ( error, backoff : backoff , logger: logger)
207
+ self . connectionCreationFailed ( error, retryDelay : retryDelay , logger: logger)
216
208
}
217
209
}
218
210
}
@@ -243,7 +235,7 @@ extension ConnectionPool {
243
235
}
244
236
}
245
237
246
- private func connectionCreationFailed( _ error: Error , backoff : TimeAmount , logger: Logger ) {
238
+ private func connectionCreationFailed( _ error: Error , retryDelay : TimeAmount , logger: Logger ) {
247
239
self . loop. assertInEventLoop ( )
248
240
249
241
logger. warning ( " failed to create connection for pool " , metadata: [
@@ -260,15 +252,17 @@ extension ConnectionPool {
260
252
// for this connection. Waiters can time out: if they do, we can just give up this connection.
261
253
// We know folks need this in the following conditions:
262
254
//
263
- // 1. For non-leaky buckets, we need this reconnection if there are any waiters AND the number of active connections (which includes
255
+ // 1. For non-elastic buckets, we need this reconnection if there are any waiters AND the number of active connections (which includes
264
256
// pending connection attempts) is less than max connections
265
- // 2. For leaky buckets, we need this reconnection if connectionWaiters.count is greater than the number of pending connection attempts.
257
+ // 2. For elastic buckets, we need this reconnection if connectionWaiters.count is greater than the number of pending connection attempts.
266
258
// 3. For either kind, if the number of active connections is less than the minimum.
267
259
let shouldReconnect : Bool
268
- if self . leaky {
260
+ switch self . maxConnectionCountBehavior {
261
+ case . elastic:
269
262
shouldReconnect = ( self . connectionWaiters. count > self . pendingConnectionCount)
270
263
|| ( self . minimumConnectionCount > self . activeConnectionCount)
271
- } else {
264
+
265
+ case . strict:
272
266
shouldReconnect = ( !self . connectionWaiters. isEmpty && self . maximumConnectionCount > self . activeConnectionCount)
273
267
|| ( self . minimumConnectionCount > self . activeConnectionCount)
274
268
}
@@ -279,12 +273,12 @@ extension ConnectionPool {
279
273
}
280
274
281
275
// Ok, we need the new connection.
282
- let newBackoff = TimeAmount . nanoseconds ( Int64 ( Float32 ( backoff . nanoseconds ) * self . backoffFactor ) )
276
+ let nextRetryDelay = self . connectionRetryStrategy . determineNewDelay ( currentDelay : retryDelay )
283
277
logger. debug ( " reconnecting after failed connection attempt " , metadata: [
284
- RedisLogging . MetadataKeys. poolConnectionRetryBackoff : " \( backoff ) ns " ,
285
- RedisLogging . MetadataKeys. poolConnectionRetryNewBackoff : " \( newBackoff ) ns "
278
+ RedisLogging . MetadataKeys. poolConnectionRetryAmount : " \( retryDelay ) ns " ,
279
+ RedisLogging . MetadataKeys. poolConnectionRetryNewAmount : " \( nextRetryDelay ) ns "
286
280
] )
287
- self . _createConnection ( backoff : newBackoff , startIn: backoff , logger: logger)
281
+ self . _createConnection ( retryDelay : nextRetryDelay , startIn: retryDelay , logger: logger)
288
282
}
289
283
290
284
/// A connection that was monitored by this pool has been closed.
@@ -352,7 +346,7 @@ extension ConnectionPool {
352
346
353
347
/// This is the on-thread implementation for leasing connections out to users. Here we work out how to get a new
354
348
/// connection, and attempt to do so.
355
- private func _leaseConnection( _ deadline : NIODeadline , logger : Logger ) -> EventLoopFuture < RedisConnection > {
349
+ private func _leaseConnection( logger : Logger , deadline : NIODeadline ) -> EventLoopFuture < RedisConnection > {
356
350
self . loop. assertInEventLoop ( )
357
351
358
352
guard case . active = self . state else {
@@ -386,11 +380,22 @@ extension ConnectionPool {
386
380
self . connectionWaiters. append ( waiter)
387
381
388
382
// Ok, we have connection targets. If the number of active connections is
389
- // below the max, or the pool is leaky , we can create a new connection. Otherwise, we just have
383
+ // below the max, or the pool is elastic , we can create a new connection. Otherwise, we just have
390
384
// to wait for a connection to come back.
391
- if self . activeConnectionCount < self . maximumConnectionCount || self . leaky {
385
+
386
+ let shouldCreateConnection : Bool
387
+ switch self . maxConnectionCountBehavior {
388
+ case . elastic: shouldCreateConnection = true
389
+ case . strict: shouldCreateConnection = false
390
+ }
391
+
392
+ if self . activeConnectionCount < self . maximumConnectionCount || shouldCreateConnection {
392
393
logger. trace ( " creating new connection " )
393
- self . _createConnection ( backoff: self . initialBackoffDelay, startIn: . nanoseconds( 0 ) , logger: logger)
394
+ self . _createConnection (
395
+ retryDelay: self . connectionRetryStrategy. initialDelay,
396
+ startIn: . nanoseconds( 0 ) ,
397
+ logger: logger
398
+ )
394
399
}
395
400
396
401
return waiter. futureResult
0 commit comments