Skip to content

Commit 98fb2a0

Browse files
Better encapsulation for connection instances (#353)
* Push Connection interface around * Tighter encapsulation of connection interface * Move ssl_context into start_tls method * Pass keepalive_expiry to connection * Push keepalive logic inside connection classes * Fix ._state in HTTP2Connection * may_close -> is_idle * Docstring * Update test cases
1 parent 1e403ba commit 98fb2a0

File tree

14 files changed

+558
-348
lines changed

14 files changed

+558
-348
lines changed

httpcore/_async/connection.py

Lines changed: 64 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,7 @@
55
from .._exceptions import ConnectError, ConnectTimeout
66
from .._types import URL, Headers, Origin, TimeoutDict
77
from .._utils import exponential_backoff, get_logger, url_to_origin
8-
from .base import (
9-
AsyncByteStream,
10-
AsyncHTTPTransport,
11-
ConnectionState,
12-
NewConnectionRequired,
13-
)
8+
from .base import AsyncByteStream, AsyncHTTPTransport, NewConnectionRequired
149
from .http import AsyncBaseHTTPConnection
1510
from .http11 import AsyncHTTP11Connection
1611

@@ -25,6 +20,7 @@ def __init__(
2520
origin: Origin,
2621
http1: bool = True,
2722
http2: bool = False,
23+
keepalive_expiry: float = None,
2824
uds: str = None,
2925
ssl_context: SSLContext = None,
3026
socket: AsyncSocketStream = None,
@@ -35,6 +31,7 @@ def __init__(
3531
self.origin = origin
3632
self.http1 = http1
3733
self.http2 = http2
34+
self.keepalive_expiry = keepalive_expiry
3835
self.uds = uds
3936
self.ssl_context = SSLContext() if ssl_context is None else ssl_context
4037
self.socket = socket
@@ -57,20 +54,58 @@ def __init__(
5754
self.backend = AutoBackend() if backend is None else backend
5855

5956
def __repr__(self) -> str:
60-
http_version = "UNKNOWN"
61-
if self.is_http11:
62-
http_version = "HTTP/1.1"
63-
elif self.is_http2:
64-
http_version = "HTTP/2"
65-
return f"<AsyncHTTPConnection http_version={http_version} state={self.state}>"
57+
return f"<AsyncHTTPConnection [{self.info()}]>"
6658

6759
def info(self) -> str:
6860
if self.connection is None:
69-
return "Not connected"
70-
elif self.state == ConnectionState.PENDING:
71-
return "Connecting"
61+
return "Connection failed" if self.connect_failed else "Connecting"
7262
return self.connection.info()
7363

64+
def should_close(self) -> bool:
65+
"""
66+
Return `True` if the connection is in a state where it should be closed.
67+
This occurs when any of the following occur:
68+
69+
* There are no active requests on an HTTP/1.1 connection, and the underlying
70+
socket is readable. The only valid state the socket can be readable in
71+
if this occurs is when the b"" EOF marker is about to be returned,
72+
indicating a server disconnect.
73+
* There are no active requests being made and the keepalive timeout has passed.
74+
"""
75+
if self.connection is None:
76+
return False
77+
return self.connection.should_close()
78+
79+
def is_idle(self) -> bool:
80+
"""
81+
Return `True` if the connection is currently idle.
82+
"""
83+
if self.connection is None:
84+
return False
85+
return self.connection.is_idle()
86+
87+
def is_closed(self) -> bool:
88+
if self.connection is None:
89+
return self.connect_failed
90+
return self.connection.is_closed()
91+
92+
def is_available(self) -> bool:
93+
"""
94+
Return `True` if the connection is currently able to accept an outgoing request.
95+
This occurs when any of the following occur:
96+
97+
* The connection has not yet been opened, and HTTP/2 support is enabled.
98+
We don't *know* at this point if we'll end up on an HTTP/2 connection or
99+
not, but we *might* do, so we indicate availability.
100+
* The connection has been opened, and is currently idle.
101+
* The connection is open, and is an HTTP/2 connection. The connection must
102+
also not currently be exceeding the maximum number of allowable concurrent
103+
streams and must not have exhausted the maximum total number of stream IDs.
104+
"""
105+
if self.connection is None:
106+
return self.http2 and not self.is_closed
107+
return self.connection.is_available()
108+
74109
@property
75110
def request_lock(self) -> AsyncLock:
76111
# We do this lazily, to make sure backend autodetection always
@@ -91,18 +126,16 @@ async def handle_async_request(
91126
timeout = cast(TimeoutDict, extensions.get("timeout", {}))
92127

93128
async with self.request_lock:
94-
if self.state == ConnectionState.PENDING:
129+
if self.connection is None:
130+
if self.connect_failed:
131+
raise NewConnectionRequired()
95132
if not self.socket:
96133
logger.trace(
97134
"open_socket origin=%r timeout=%r", self.origin, timeout
98135
)
99136
self.socket = await self._open_socket(timeout)
100137
self._create_connection(self.socket)
101-
elif self.state in (ConnectionState.READY, ConnectionState.IDLE):
102-
pass
103-
elif self.state == ConnectionState.ACTIVE and self.is_http2:
104-
pass
105-
else:
138+
elif not self.connection.is_available():
106139
raise NewConnectionRequired()
107140

108141
assert self.connection is not None
@@ -159,33 +192,24 @@ def _create_connection(self, socket: AsyncSocketStream) -> None:
159192

160193
self.is_http2 = True
161194
self.connection = AsyncHTTP2Connection(
162-
socket=socket, backend=self.backend, ssl_context=self.ssl_context
195+
socket=socket,
196+
keepalive_expiry=self.keepalive_expiry,
197+
backend=self.backend,
163198
)
164199
else:
165200
self.is_http11 = True
166201
self.connection = AsyncHTTP11Connection(
167-
socket=socket, ssl_context=self.ssl_context
202+
socket=socket, keepalive_expiry=self.keepalive_expiry
168203
)
169204

170-
@property
171-
def state(self) -> ConnectionState:
172-
if self.connect_failed:
173-
return ConnectionState.CLOSED
174-
elif self.connection is None:
175-
return ConnectionState.PENDING
176-
return self.connection.get_state()
177-
178-
def is_socket_readable(self) -> bool:
179-
return self.connection is not None and self.connection.is_socket_readable()
180-
181-
def mark_as_ready(self) -> None:
182-
if self.connection is not None:
183-
self.connection.mark_as_ready()
184-
185-
async def start_tls(self, hostname: bytes, timeout: TimeoutDict = None) -> None:
205+
async def start_tls(
206+
self, hostname: bytes, ssl_context: SSLContext, timeout: TimeoutDict = None
207+
) -> None:
186208
if self.connection is not None:
187209
logger.trace("start_tls hostname=%r timeout=%r", hostname, timeout)
188-
self.socket = await self.connection.start_tls(hostname, timeout)
210+
self.socket = await self.connection.start_tls(
211+
hostname, ssl_context, timeout
212+
)
189213
logger.trace("start_tls complete hostname=%r timeout=%r", hostname, timeout)
190214

191215
async def aclose(self) -> None:

httpcore/_async/connection_pool.py

Lines changed: 9 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,7 @@
1818
from .._threadlock import ThreadLock
1919
from .._types import URL, Headers, Origin, TimeoutDict
2020
from .._utils import get_logger, origin_to_url_string, url_to_origin
21-
from .base import (
22-
AsyncByteStream,
23-
AsyncHTTPTransport,
24-
ConnectionState,
25-
NewConnectionRequired,
26-
)
21+
from .base import AsyncByteStream, AsyncHTTPTransport, NewConnectionRequired
2722
from .connection import AsyncHTTPConnection
2823

2924
logger = get_logger(__name__)
@@ -184,6 +179,7 @@ def _create_connection(
184179
origin=origin,
185180
http1=self._http1,
186181
http2=self._http2,
182+
keepalive_expiry=self._keepalive_expiry,
187183
uds=self._uds,
188184
ssl_context=self._ssl_context,
189185
local_address=self._local_address,
@@ -260,53 +256,15 @@ async def _get_connection_from_pool(
260256
self, origin: Origin
261257
) -> Optional[AsyncHTTPConnection]:
262258
# Determine expired keep alive connections on this origin.
263-
seen_http11 = False
264-
pending_connection = None
265259
reuse_connection = None
266260
connections_to_close = set()
267261

268262
for connection in self._connections_for_origin(origin):
269-
if connection.is_http11:
270-
seen_http11 = True
271-
272-
if connection.state == ConnectionState.IDLE:
273-
if connection.is_socket_readable():
274-
# If the socket is readable while the connection is idle (meaning
275-
# we don't expect the server to send any data), then the only valid
276-
# reason is that the other end has disconnected, and is readable
277-
# because it is ready to return the b"" disconnect indicator, which
278-
# means we should drop the connection too.
279-
# (For a detailed run-through of what a "readable" socket is, and
280-
# why this is the best thing for us to do here, see:
281-
# https://github.com/encode/httpx/pull/143#issuecomment-515181778)
282-
logger.trace("removing dropped idle connection=%r", connection)
283-
# IDLE connections that have been dropped should be
284-
# removed from the pool.
285-
connections_to_close.add(connection)
286-
await self._remove_from_pool(connection)
287-
else:
288-
# IDLE connections that are still maintained may
289-
# be reused.
290-
logger.trace("reusing idle http11 connection=%r", connection)
291-
reuse_connection = connection
292-
elif connection.state == ConnectionState.ACTIVE and connection.is_http2:
293-
# HTTP/2 connections may be reused.
294-
logger.trace("reusing active http2 connection=%r", connection)
263+
if connection.should_close():
264+
connections_to_close.add(connection)
265+
await self._remove_from_pool(connection)
266+
elif connection.is_available():
295267
reuse_connection = connection
296-
elif connection.state == ConnectionState.PENDING:
297-
# Pending connections may potentially be reused.
298-
pending_connection = connection
299-
300-
if reuse_connection is not None:
301-
# Mark the connection as READY before we return it, to indicate
302-
# that if it is HTTP/1.1 then it should not be re-acquired.
303-
reuse_connection.mark_as_ready()
304-
reuse_connection.expires_at = None
305-
elif self._http2 and pending_connection is not None and not seen_http11:
306-
# If we have a PENDING connection, and no HTTP/1.1 connections
307-
# on this origin, then we can attempt to share the connection.
308-
logger.trace("reusing pending connection=%r", connection)
309-
reuse_connection = pending_connection
310268

311269
# Close any dropped connections.
312270
for connection in connections_to_close:
@@ -318,19 +276,16 @@ async def _response_closed(self, connection: AsyncHTTPConnection) -> None:
318276
remove_from_pool = False
319277
close_connection = False
320278

321-
if connection.state == ConnectionState.CLOSED:
279+
if connection.is_closed():
322280
remove_from_pool = True
323-
elif connection.state == ConnectionState.IDLE:
281+
elif connection.is_idle():
324282
num_connections = len(self._get_all_connections())
325283
if (
326284
self._max_keepalive_connections is not None
327285
and num_connections > self._max_keepalive_connections
328286
):
329287
remove_from_pool = True
330288
close_connection = True
331-
elif self._keepalive_expiry is not None:
332-
now = await self._backend.time()
333-
connection.expires_at = now + self._keepalive_expiry
334289

335290
if remove_from_pool:
336291
await self._remove_from_pool(connection)
@@ -353,11 +308,7 @@ async def _keepalive_sweep(self) -> None:
353308
connections_to_close = set()
354309

355310
for connection in self._get_all_connections():
356-
if (
357-
connection.state == ConnectionState.IDLE
358-
and connection.expires_at is not None
359-
and now >= connection.expires_at
360-
):
311+
if connection.should_close():
361312
connections_to_close.add(connection)
362313
await self._remove_from_pool(connection)
363314

httpcore/_async/http.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,40 @@
1+
from ssl import SSLContext
2+
13
from .._backends.auto import AsyncSocketStream
24
from .._types import TimeoutDict
3-
from .base import AsyncHTTPTransport, ConnectionState
5+
from .base import AsyncHTTPTransport
46

57

68
class AsyncBaseHTTPConnection(AsyncHTTPTransport):
79
def info(self) -> str:
810
raise NotImplementedError() # pragma: nocover
911

10-
def get_state(self) -> ConnectionState:
12+
def should_close(self) -> bool:
13+
"""
14+
Return `True` if the connection is in a state where it should be closed.
15+
"""
16+
raise NotImplementedError() # pragma: nocover
17+
18+
def is_idle(self) -> bool:
1119
"""
12-
Return the current state.
20+
Return `True` if the connection is currently idle.
1321
"""
1422
raise NotImplementedError() # pragma: nocover
1523

16-
def mark_as_ready(self) -> None:
24+
def is_closed(self) -> bool:
1725
"""
18-
The connection has been acquired from the pool, and the state
19-
should reflect that.
26+
Return `True` if the connection has been closed.
2027
"""
2128
raise NotImplementedError() # pragma: nocover
2229

23-
def is_socket_readable(self) -> bool:
30+
def is_available(self) -> bool:
2431
"""
25-
Return 'True' if the underlying network socket is readable.
32+
Return `True` if the connection is currently able to accept an outgoing request.
2633
"""
2734
raise NotImplementedError() # pragma: nocover
2835

2936
async def start_tls(
30-
self, hostname: bytes, timeout: TimeoutDict = None
37+
self, hostname: bytes, ssl_context: SSLContext, timeout: TimeoutDict = None
3138
) -> AsyncSocketStream:
3239
"""
3340
Upgrade the underlying socket to TLS.

0 commit comments

Comments
 (0)