Skip to content

Commit 755c1b5

Browse files
committed
Merge branch 'master' of github.com:mongodb/mongo-python-driver
2 parents 87a6ca6 + 34f7d7e commit 755c1b5

File tree

14 files changed

+246
-27
lines changed

14 files changed

+246
-27
lines changed

doc/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ Version 4.12.1 is a bug fix release.
1616
errors such as: "NotImplementedError: Database objects do not implement truth value testing or bool()".
1717
- Removed Eventlet testing against Python versions newer than 3.9 since
1818
Eventlet is actively being sunset by its maintainers and has compatibility issues with PyMongo's dnspython dependency.
19+
- Fixed a bug where MongoDB cluster topology changes could cause asynchronous operations to take much longer to complete
20+
due to holding the Topology lock while closing stale connections.
1921

2022
Issues Resolved
2123
...............

pymongo/asynchronous/pool.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import asyncio
1718
import collections
1819
import contextlib
1920
import logging
@@ -860,8 +861,14 @@ async def _reset(
860861
# PoolClosedEvent but that reset() SHOULD close sockets *after*
861862
# publishing the PoolClearedEvent.
862863
if close:
863-
for conn in sockets:
864-
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
864+
if not _IS_SYNC:
865+
await asyncio.gather(
866+
*[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets],
867+
return_exceptions=True,
868+
)
869+
else:
870+
for conn in sockets:
871+
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
865872
if self.enabled_for_cmap:
866873
assert listeners is not None
867874
listeners.publish_pool_closed(self.address)
@@ -891,8 +898,14 @@ async def _reset(
891898
serverPort=self.address[1],
892899
serviceId=service_id,
893900
)
894-
for conn in sockets:
895-
await conn.close_conn(ConnectionClosedReason.STALE)
901+
if not _IS_SYNC:
902+
await asyncio.gather(
903+
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets],
904+
return_exceptions=True,
905+
)
906+
else:
907+
for conn in sockets:
908+
await conn.close_conn(ConnectionClosedReason.STALE)
896909

897910
async def update_is_writable(self, is_writable: Optional[bool]) -> None:
898911
"""Updates the is_writable attribute on all sockets currently in the
@@ -938,8 +951,14 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
938951
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
939952
):
940953
close_conns.append(self.conns.pop())
941-
for conn in close_conns:
942-
await conn.close_conn(ConnectionClosedReason.IDLE)
954+
if not _IS_SYNC:
955+
await asyncio.gather(
956+
*[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns],
957+
return_exceptions=True,
958+
)
959+
else:
960+
for conn in close_conns:
961+
await conn.close_conn(ConnectionClosedReason.IDLE)
943962

944963
while True:
945964
async with self.size_cond:

pymongo/asynchronous/topology.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -529,12 +529,6 @@ async def _process_change(
529529
if not _IS_SYNC:
530530
self._monitor_tasks.append(self._srv_monitor)
531531

532-
# Clear the pool from a failed heartbeat.
533-
if reset_pool:
534-
server = self._servers.get(server_description.address)
535-
if server:
536-
await server.pool.reset(interrupt_connections=interrupt_connections)
537-
538532
# Wake anything waiting in select_servers().
539533
self._condition.notify_all()
540534

@@ -557,6 +551,11 @@ async def on_change(
557551
# that didn't include this server.
558552
if self._opened and self._description.has_server(server_description.address):
559553
await self._process_change(server_description, reset_pool, interrupt_connections)
554+
# Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close.
555+
if reset_pool:
556+
server = self._servers.get(server_description.address)
557+
if server:
558+
await server.pool.reset(interrupt_connections=interrupt_connections)
560559

561560
async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
562561
"""Process a new seedlist on an opened topology.

pymongo/synchronous/pool.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import asyncio
1718
import collections
1819
import contextlib
1920
import logging
@@ -858,8 +859,14 @@ def _reset(
858859
# PoolClosedEvent but that reset() SHOULD close sockets *after*
859860
# publishing the PoolClearedEvent.
860861
if close:
861-
for conn in sockets:
862-
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
862+
if not _IS_SYNC:
863+
asyncio.gather(
864+
*[conn.close_conn(ConnectionClosedReason.POOL_CLOSED) for conn in sockets],
865+
return_exceptions=True,
866+
)
867+
else:
868+
for conn in sockets:
869+
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
863870
if self.enabled_for_cmap:
864871
assert listeners is not None
865872
listeners.publish_pool_closed(self.address)
@@ -889,8 +896,14 @@ def _reset(
889896
serverPort=self.address[1],
890897
serviceId=service_id,
891898
)
892-
for conn in sockets:
893-
conn.close_conn(ConnectionClosedReason.STALE)
899+
if not _IS_SYNC:
900+
asyncio.gather(
901+
*[conn.close_conn(ConnectionClosedReason.STALE) for conn in sockets],
902+
return_exceptions=True,
903+
)
904+
else:
905+
for conn in sockets:
906+
conn.close_conn(ConnectionClosedReason.STALE)
894907

895908
def update_is_writable(self, is_writable: Optional[bool]) -> None:
896909
"""Updates the is_writable attribute on all sockets currently in the
@@ -934,8 +947,14 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
934947
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
935948
):
936949
close_conns.append(self.conns.pop())
937-
for conn in close_conns:
938-
conn.close_conn(ConnectionClosedReason.IDLE)
950+
if not _IS_SYNC:
951+
asyncio.gather(
952+
*[conn.close_conn(ConnectionClosedReason.IDLE) for conn in close_conns],
953+
return_exceptions=True,
954+
)
955+
else:
956+
for conn in close_conns:
957+
conn.close_conn(ConnectionClosedReason.IDLE)
939958

940959
while True:
941960
with self.size_cond:

pymongo/synchronous/topology.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -529,12 +529,6 @@ def _process_change(
529529
if not _IS_SYNC:
530530
self._monitor_tasks.append(self._srv_monitor)
531531

532-
# Clear the pool from a failed heartbeat.
533-
if reset_pool:
534-
server = self._servers.get(server_description.address)
535-
if server:
536-
server.pool.reset(interrupt_connections=interrupt_connections)
537-
538532
# Wake anything waiting in select_servers().
539533
self._condition.notify_all()
540534

@@ -557,6 +551,11 @@ def on_change(
557551
# that didn't include this server.
558552
if self._opened and self._description.has_server(server_description.address):
559553
self._process_change(server_description, reset_pool, interrupt_connections)
554+
# Clear the pool from a failed heartbeat, done outside the lock to avoid blocking on connection close.
555+
if reset_pool:
556+
server = self._servers.get(server_description.address)
557+
if server:
558+
server.pool.reset(interrupt_connections=interrupt_connections)
560559

561560
def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None:
562561
"""Process a new seedlist on an opened topology.

test/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,14 @@ def require_sync(self, func):
826826
lambda: _IS_SYNC, "This test only works with the synchronous API", func=func
827827
)
828828

829+
def require_async(self, func):
830+
"""Run a test only if using the asynchronous API.""" # unasync: off
831+
return self._require(
832+
lambda: not _IS_SYNC,
833+
"This test only works with the asynchronous API", # unasync: off
834+
func=func,
835+
)
836+
829837
def mongos_seeds(self):
830838
return ",".join("{}:{}".format(*address) for address in self.mongoses)
831839

test/asynchronous/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,14 @@ def require_sync(self, func):
828828
lambda: _IS_SYNC, "This test only works with the synchronous API", func=func
829829
)
830830

831+
def require_async(self, func):
832+
"""Run a test only if using the asynchronous API.""" # unasync: off
833+
return self._require(
834+
lambda: not _IS_SYNC,
835+
"This test only works with the asynchronous API", # unasync: off
836+
func=func,
837+
)
838+
831839
def mongos_seeds(self):
832840
return ",".join("{}:{}".format(*address) for address in self.mongoses)
833841

test/asynchronous/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ async def test_init_disconnected_with_auth(self):
850850
with self.assertRaises(ConnectionFailure):
851851
await c.pymongo_test.test.find_one()
852852

853-
@async_client_context.require_no_standalone
853+
@async_client_context.require_replica_set
854854
@async_client_context.require_no_load_balancer
855855
@async_client_context.require_tls
856856
async def test_init_disconnected_with_srv(self):

test/asynchronous/test_discovery_and_monitoring.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@
2020
import socketserver
2121
import sys
2222
import threading
23+
import time
2324
from asyncio import StreamReader, StreamWriter
2425
from pathlib import Path
2526
from test.asynchronous.helpers import ConcurrentRunner
2627

28+
from pymongo.asynchronous.pool import AsyncConnection
29+
from pymongo.operations import _Op
30+
from pymongo.server_selectors import writable_server_selector
31+
2732
sys.path[0:0] = [""]
2833

2934
from test.asynchronous import (
@@ -370,6 +375,74 @@ async def test_pool_unpause(self):
370375
await listener.async_wait_for_event(monitoring.ServerHeartbeatSucceededEvent, 1)
371376
await listener.async_wait_for_event(monitoring.PoolReadyEvent, 1)
372377

378+
@async_client_context.require_failCommand_appName
379+
@async_client_context.require_test_commands
380+
@async_client_context.require_async
381+
async def test_connection_close_does_not_block_other_operations(self):
382+
listener = CMAPHeartbeatListener()
383+
client = await self.async_single_client(
384+
appName="SDAMConnectionCloseTest",
385+
event_listeners=[listener],
386+
heartbeatFrequencyMS=500,
387+
minPoolSize=10,
388+
)
389+
server = await (await client._get_topology()).select_server(
390+
writable_server_selector, _Op.TEST
391+
)
392+
await async_wait_until(
393+
lambda: len(server._pool.conns) == 10,
394+
"pool initialized with 10 connections",
395+
)
396+
397+
await client.db.test.insert_one({"x": 1})
398+
close_delay = 0.1
399+
latencies = []
400+
should_exit = []
401+
402+
async def run_task():
403+
while True:
404+
start_time = time.monotonic()
405+
await client.db.test.find_one({})
406+
elapsed = time.monotonic() - start_time
407+
latencies.append(elapsed)
408+
if should_exit:
409+
break
410+
await asyncio.sleep(0.001)
411+
412+
task = ConcurrentRunner(target=run_task)
413+
await task.start()
414+
original_close = AsyncConnection.close_conn
415+
try:
416+
# Artificially delay the close operation to simulate a slow close
417+
async def mock_close(self, reason):
418+
await asyncio.sleep(close_delay)
419+
await original_close(self, reason)
420+
421+
AsyncConnection.close_conn = mock_close
422+
423+
fail_hello = {
424+
"mode": {"times": 4},
425+
"data": {
426+
"failCommands": [HelloCompat.LEGACY_CMD, "hello"],
427+
"errorCode": 91,
428+
"appName": "SDAMConnectionCloseTest",
429+
},
430+
}
431+
async with self.fail_point(fail_hello):
432+
# Wait for server heartbeat to fail
433+
await listener.async_wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1)
434+
# Wait until all idle connections are closed to simulate real-world conditions
435+
await listener.async_wait_for_event(monitoring.ConnectionClosedEvent, 10)
436+
# Wait for one more find to complete after the pool has been reset, then shutdown the task
437+
n = len(latencies)
438+
await async_wait_until(lambda: len(latencies) >= n + 1, "run one more find")
439+
should_exit.append(True)
440+
await task.join()
441+
# No operation latency should not significantly exceed close_delay
442+
self.assertLessEqual(max(latencies), close_delay * 5.0)
443+
finally:
444+
AsyncConnection.close_conn = original_close
445+
373446

374447
class TestServerMonitoringMode(AsyncIntegrationTest):
375448
@async_client_context.require_no_serverless

test/asynchronous/test_ssl.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,14 @@ async def asyncTearDown(self):
166166

167167
@async_client_context.require_tls
168168
async def test_simple_ssl(self):
169+
if "PyPy" in sys.version:
170+
self.skipTest("Test is flaky on PyPy")
169171
# Expects the server to be running with ssl and with
170172
# no --sslPEMKeyFile or with --sslWeakCertificateValidation
171173
await self.assertClientWorks(self.client)
172174

173175
@async_client_context.require_tlsCertificateKeyFile
176+
@async_client_context.require_no_api_version
174177
@ignore_deprecations
175178
async def test_tlsCertificateKeyFilePassword(self):
176179
# Expects the server to be running with server.pem and ca.pem
@@ -376,6 +379,7 @@ async def test_cert_ssl_validation_hostname_matching(self):
376379
)
377380

378381
@async_client_context.require_tlsCertificateKeyFile
382+
@async_client_context.require_no_api_version
379383
@ignore_deprecations
380384
async def test_tlsCRLFile_support(self):
381385
if not hasattr(ssl, "VERIFY_CRL_CHECK_LEAF") or _ssl.IS_PYOPENSSL:
@@ -531,6 +535,7 @@ def test_wincertstore(self):
531535

532536
@async_client_context.require_auth
533537
@async_client_context.require_tlsCertificateKeyFile
538+
@async_client_context.require_no_api_version
534539
@ignore_deprecations
535540
async def test_mongodb_x509_auth(self):
536541
host, port = await async_client_context.host, await async_client_context.port
@@ -640,6 +645,7 @@ async def test_mongodb_x509_auth(self):
640645
self.fail("Invalid certificate accepted.")
641646

642647
@async_client_context.require_tlsCertificateKeyFile
648+
@async_client_context.require_no_api_version
643649
@ignore_deprecations
644650
async def test_connect_with_ca_bundle(self):
645651
def remove(path):

test/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ def test_init_disconnected_with_auth(self):
825825
with self.assertRaises(ConnectionFailure):
826826
c.pymongo_test.test.find_one()
827827

828-
@client_context.require_no_standalone
828+
@client_context.require_replica_set
829829
@client_context.require_no_load_balancer
830830
@client_context.require_tls
831831
def test_init_disconnected_with_srv(self):

0 commit comments

Comments
 (0)