Skip to content

Commit a345ef4

Browse files
committed
PYTHON-2634 Only update pools for data-bearing servers (#590)
Fixes a noisy OperationFailure: Authentication failed error. Do not attempt to create unneeded connections to arbiters, ghosts, hidden members, or unknown members. (cherry picked from commit 4c7718e) Conflicts: pymongo/topology.py test/test_client.py test/test_cmap.py
1 parent 9e01a6b commit a345ef4

File tree

6 files changed

+137
-39
lines changed

6 files changed

+137
-39
lines changed

pymongo/topology.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,15 +430,26 @@ def handle_getlasterror(self, address, error_msg):
430430
ServerDescription(address, error=error), True)
431431
server.request_check()
432432

433+
def data_bearing_servers(self):
434+
"""Return a list of all data-bearing servers.
435+
436+
This includes any server that might be selected for an operation.
437+
"""
438+
if self._description.topology_type == TOPOLOGY_TYPE.Single:
439+
return self._description.known_servers
440+
return self._description.readable_servers
441+
433442
def update_pool(self, all_credentials):
434443
# Remove any stale sockets and add new sockets if pool is too small.
435444
servers = []
436445
with self._lock:
437-
for server in self._servers.values():
438-
servers.append((server, server._pool.generation))
446+
# Only update pools for data-bearing servers.
447+
for sd in self.data_bearing_servers():
448+
server = self._servers[sd.address]
449+
servers.append((server, server.pool.generation))
439450

440451
for server, generation in servers:
441-
server._pool.remove_stale_sockets(generation, all_credentials)
452+
server.pool.remove_stale_sockets(generation, all_credentials)
442453

443454
def close(self):
444455
"""Clear pools and terminate monitors. Topology reopens on demand."""

test/pymongo_mocks.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@ def _check_once(self):
8585
class MockClient(MongoClient):
8686
def __init__(
8787
self, standalones, members, mongoses, ismaster_hosts=None,
88-
*args, **kwargs):
88+
arbiters=None, down_hosts=None, *args, **kwargs):
8989
"""A MongoClient connected to the default server, with a mock topology.
9090
91-
standalones, members, mongoses determine the configuration of the
92-
topology. They are formatted like ['a:1', 'b:2']. ismaster_hosts
93-
provides an alternative host list for the server's mocked ismaster
94-
response; see test_connect_with_internal_ips.
91+
standalones, members, mongoses, arbiters, and down_hosts determine the
92+
configuration of the topology. They are formatted like ['a:1', 'b:2'].
93+
ismaster_hosts provides an alternative host list for the server's
94+
mocked ismaster response; see test_connect_with_internal_ips.
9595
"""
9696
self.mock_standalones = standalones[:]
9797
self.mock_members = members[:]
@@ -101,6 +101,9 @@ def __init__(
101101
else:
102102
self.mock_primary = None
103103

104+
# Hosts that should be considered an arbiter.
105+
self.mock_arbiters = arbiters[:] if arbiters else []
106+
104107
if ismaster_hosts is not None:
105108
self.mock_ismaster_hosts = ismaster_hosts
106109
else:
@@ -109,7 +112,7 @@ def __init__(
109112
self.mock_mongoses = mongoses[:]
110113

111114
# Hosts that should raise socket errors.
112-
self.mock_down_hosts = []
115+
self.mock_down_hosts = down_hosts[:] if down_hosts else []
113116

114117
# Hostname -> (min wire version, max wire version)
115118
self.mock_wire_versions = {}
@@ -182,6 +185,10 @@ def mock_is_master(self, host):
182185

183186
if self.mock_primary:
184187
response['primary'] = self.mock_primary
188+
189+
if host in self.mock_arbiters:
190+
response['arbiterOnly'] = True
191+
response['secondary'] = False
185192
elif host in self.mock_mongoses:
186193
response = {
187194
'ok': 1,

test/test_client.py

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from bson.son import SON
3636
from bson.tz_util import utc
3737
import pymongo
38-
from pymongo import auth, message
38+
from pymongo import auth, message, monitoring
3939
from pymongo.common import CONNECT_TIMEOUT, _UUID_REPRESENTATIONS
4040
from pymongo.command_cursor import CommandCursor
4141
from pymongo.compression_support import _HAVE_SNAPPY, _HAVE_ZSTD
@@ -58,7 +58,7 @@
5858
from pymongo.pool import SocketInfo, _METADATA
5959
from pymongo.read_preferences import ReadPreference
6060
from pymongo.server_description import ServerDescription
61-
from pymongo.server_selectors import (any_server_selector,
61+
from pymongo.server_selectors import (readable_server_selector,
6262
writable_server_selector)
6363
from pymongo.server_type import SERVER_TYPE
6464
from pymongo.settings import TOPOLOGY_TYPE
@@ -76,6 +76,7 @@
7676
from test.pymongo_mocks import MockClient
7777
from test.utils import (assertRaisesExactly,
7878
connected,
79+
CMAPListener,
7980
delay,
8081
FunctionCallRecorder,
8182
get_pool,
@@ -452,21 +453,25 @@ def test_uri_security_options(self):
452453

453454
class TestClient(IntegrationTest):
454455

455-
def test_max_idle_time_reaper(self):
456+
def test_max_idle_time_reaper_default(self):
456457
with client_knobs(kill_cursor_frequency=0.1):
457458
# Assert reaper doesn't remove sockets when maxIdleTimeMS not set
458459
client = rs_or_single_client()
459-
server = client._get_topology().select_server(any_server_selector)
460+
server = client._get_topology().select_server(
461+
readable_server_selector)
460462
with server._pool.get_socket({}) as sock_info:
461463
pass
462464
self.assertEqual(1, len(server._pool.sockets))
463465
self.assertTrue(sock_info in server._pool.sockets)
464466
client.close()
465467

468+
def test_max_idle_time_reaper_removes_stale_minPoolSize(self):
469+
with client_knobs(kill_cursor_frequency=0.1):
466470
# Assert reaper removes idle socket and replaces it with a new one
467471
client = rs_or_single_client(maxIdleTimeMS=500,
468472
minPoolSize=1)
469-
server = client._get_topology().select_server(any_server_selector)
473+
server = client._get_topology().select_server(
474+
readable_server_selector)
470475
with server._pool.get_socket({}) as sock_info:
471476
pass
472477
# When the reaper runs at the same time as the get_socket, two
@@ -478,11 +483,14 @@ def test_max_idle_time_reaper(self):
478483
"replace stale socket")
479484
client.close()
480485

486+
def test_max_idle_time_reaper_does_not_exceed_maxPoolSize(self):
487+
with client_knobs(kill_cursor_frequency=0.1):
481488
# Assert reaper respects maxPoolSize when adding new sockets.
482489
client = rs_or_single_client(maxIdleTimeMS=500,
483490
minPoolSize=1,
484491
maxPoolSize=1)
485-
server = client._get_topology().select_server(any_server_selector)
492+
server = client._get_topology().select_server(
493+
readable_server_selector)
486494
with server._pool.get_socket({}) as sock_info:
487495
pass
488496
# When the reaper runs at the same time as the get_socket,
@@ -494,9 +502,12 @@ def test_max_idle_time_reaper(self):
494502
"replace stale socket")
495503
client.close()
496504

505+
def test_max_idle_time_reaper_removes_stale(self):
506+
with client_knobs(kill_cursor_frequency=0.1):
497507
# Assert reaper has removed idle socket and NOT replaced it
498508
client = rs_or_single_client(maxIdleTimeMS=500)
499-
server = client._get_topology().select_server(any_server_selector)
509+
server = client._get_topology().select_server(
510+
readable_server_selector)
500511
with server._pool.get_socket({}) as sock_info_one:
501512
pass
502513
# Assert that the pool does not close sockets prematurely.
@@ -512,12 +523,14 @@ def test_max_idle_time_reaper(self):
512523
def test_min_pool_size(self):
513524
with client_knobs(kill_cursor_frequency=.1):
514525
client = rs_or_single_client()
515-
server = client._get_topology().select_server(any_server_selector)
526+
server = client._get_topology().select_server(
527+
readable_server_selector)
516528
self.assertEqual(0, len(server._pool.sockets))
517529

518530
# Assert that pool started up at minPoolSize
519531
client = rs_or_single_client(minPoolSize=10)
520-
server = client._get_topology().select_server(any_server_selector)
532+
server = client._get_topology().select_server(
533+
readable_server_selector)
521534
wait_until(lambda: 10 == len(server._pool.sockets),
522535
"pool initialized with 10 sockets")
523536

@@ -532,7 +545,8 @@ def test_max_idle_time_checkout(self):
532545
# Use high frequency to test _get_socket_no_auth.
533546
with client_knobs(kill_cursor_frequency=99999999):
534547
client = rs_or_single_client(maxIdleTimeMS=500)
535-
server = client._get_topology().select_server(any_server_selector)
548+
server = client._get_topology().select_server(
549+
readable_server_selector)
536550
with server._pool.get_socket({}) as sock_info:
537551
pass
538552
self.assertEqual(1, len(server._pool.sockets))
@@ -546,7 +560,8 @@ def test_max_idle_time_checkout(self):
546560

547561
# Test that sockets are reused if maxIdleTimeMS is not set.
548562
client = rs_or_single_client()
549-
server = client._get_topology().select_server(any_server_selector)
563+
server = client._get_topology().select_server(
564+
readable_server_selector)
550565
with server._pool.get_socket({}) as sock_info:
551566
pass
552567
self.assertEqual(1, len(server._pool.sockets))
@@ -2008,5 +2023,60 @@ def timeout_task():
20082023
self.assertIsNone(ct.get())
20092024

20102025

2026+
class TestClientPool(MockClientTest):
2027+
2028+
def test_rs_client_does_not_maintain_pool_to_arbiters(self):
2029+
listener = CMAPListener()
2030+
c = MockClient(
2031+
standalones=[],
2032+
members=['a:1', 'b:2', 'c:3', 'd:4'],
2033+
mongoses=[],
2034+
arbiters=['c:3'], # c:3 is an arbiter.
2035+
down_hosts=['d:4'], # d:4 is unreachable.
2036+
host=['a:1', 'b:2', 'c:3', 'd:4'],
2037+
replicaSet='rs',
2038+
minPoolSize=1, # minPoolSize
2039+
event_listeners=[listener],
2040+
)
2041+
self.addCleanup(c.close)
2042+
2043+
wait_until(lambda: len(c.nodes) == 3, 'connect')
2044+
self.assertEqual(c.address, ('a', 1))
2045+
self.assertEqual(c.arbiters, set([('c', 3)]))
2046+
# Assert that we create 2 and only 2 pooled connections.
2047+
listener.wait_for_event(monitoring.ConnectionReadyEvent, 2)
2048+
self.assertEqual(
2049+
listener.event_count(monitoring.ConnectionCreatedEvent), 2)
2050+
# Assert that we do not create connections to arbiters.
2051+
arbiter = c._topology.get_server_by_address(('c', 3))
2052+
self.assertFalse(arbiter.pool.sockets)
2053+
# Assert that we do not create connections to unknown servers.
2054+
arbiter = c._topology.get_server_by_address(('d', 4))
2055+
self.assertFalse(arbiter.pool.sockets)
2056+
2057+
def test_direct_client_maintains_pool_to_arbiter(self):
2058+
listener = CMAPListener()
2059+
c = MockClient(
2060+
standalones=[],
2061+
members=['a:1', 'b:2', 'c:3'],
2062+
mongoses=[],
2063+
arbiters=['c:3'], # c:3 is an arbiter.
2064+
host='c:3',
2065+
directConnection=True,
2066+
minPoolSize=1, # minPoolSize
2067+
event_listeners=[listener],
2068+
)
2069+
self.addCleanup(c.close)
2070+
2071+
wait_until(lambda: len(c.nodes) == 1, 'connect')
2072+
self.assertEqual(c.address, ('c', 3))
2073+
# Assert that we create 1 pooled connection.
2074+
listener.wait_for_event(monitoring.ConnectionReadyEvent, 1)
2075+
self.assertEqual(
2076+
listener.event_count(monitoring.ConnectionCreatedEvent), 1)
2077+
arbiter = c._topology.get_server_by_address(('c', 3))
2078+
self.assertEqual(len(arbiter.pool.sockets), 1)
2079+
2080+
20112081
if __name__ == "__main__":
20122082
unittest.main()

test/test_heartbeat_monitoring.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ def _check_with_socket(self, *args, **kwargs):
5151
# monitor thread may run multiple times during the execution
5252
# of this test.
5353
wait_until(
54-
lambda: len(listener.results) >= expected_len,
54+
lambda: len(listener.events) >= expected_len,
5555
"publish all events")
5656

5757
try:
5858
# zip gives us len(expected_results) pairs.
59-
for expected, actual in zip(expected_results, listener.results):
59+
for expected, actual in zip(expected_results, listener.events):
6060
self.assertEqual(expected,
6161
actual.__class__.__name__)
6262
self.assertEqual(actual.connection_id,

test/test_streaming_protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def hb_failed(event):
215215
self.assertTrue(hb_failed_events[0].awaited)
216216
# Depending on thread scheduling, the failed heartbeat could occur on
217217
# the second or third check.
218-
events = [type(e) for e in hb_listener.results[:4]]
218+
events = [type(e) for e in hb_listener.events[:4]]
219219
if events == [monitoring.ServerHeartbeatStartedEvent,
220220
monitoring.ServerHeartbeatSucceededEvent,
221221
monitoring.ServerHeartbeatStartedEvent,

test/utils.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from pymongo import (MongoClient,
3737
monitoring, read_preferences)
3838
from pymongo.errors import ConfigurationError, OperationFailure
39-
from pymongo.monitoring import _SENSITIVE_COMMANDS, ConnectionPoolListener
39+
from pymongo.monitoring import _SENSITIVE_COMMANDS
4040
from pymongo.pool import (_CancellationContext,
4141
PoolOptions)
4242
from pymongo.read_concern import ReadConcern
@@ -60,7 +60,7 @@
6060
IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=50)
6161

6262

63-
class CMAPListener(ConnectionPoolListener):
63+
class BaseListener(object):
6464
def __init__(self):
6565
self.events = []
6666

@@ -71,8 +71,26 @@ def add_event(self, event):
7171
self.events.append(event)
7272

7373
def event_count(self, event_type):
74-
return len([event for event in self.events[:]
75-
if isinstance(event, event_type)])
74+
return len(self.events_by_type(event_type))
75+
76+
def events_by_type(self, event_type):
77+
"""Return the matching events by event class.
78+
79+
event_type can be a single class or a tuple of classes.
80+
"""
81+
return self.matching(lambda e: isinstance(e, event_type))
82+
83+
def matching(self, matcher):
84+
"""Return the matching events."""
85+
return [event for event in self.events[:] if matcher(event)]
86+
87+
def wait_for_event(self, event, count):
88+
"""Wait for a number of events to be published, or fail."""
89+
wait_until(lambda: self.event_count(event) >= count,
90+
'find %s %s event(s)' % (count, event))
91+
92+
93+
class CMAPListener(BaseListener, monitoring.ConnectionPoolListener):
7694

7795
def connection_created(self, event):
7896
self.add_event(event)
@@ -196,25 +214,17 @@ class ServerAndTopologyEventListener(ServerEventListener,
196214
"""Listens to Server and Topology events."""
197215

198216

199-
class HeartbeatEventListener(monitoring.ServerHeartbeatListener):
217+
class HeartbeatEventListener(BaseListener, monitoring.ServerHeartbeatListener):
200218
"""Listens to only server heartbeat events."""
201219

202-
def __init__(self):
203-
self.results = []
204-
205220
def started(self, event):
206-
self.results.append(event)
221+
self.add_event(event)
207222

208223
def succeeded(self, event):
209-
self.results.append(event)
224+
self.add_event(event)
210225

211226
def failed(self, event):
212-
self.results.append(event)
213-
214-
def matching(self, matcher):
215-
"""Return the matching events."""
216-
results = self.results[:]
217-
return [event for event in results if matcher(event)]
227+
self.add_event(event)
218228

219229

220230
class MockSocketInfo(object):

0 commit comments

Comments
 (0)