Skip to content

Commit 935f926

Browse files
authored
PYTHON-3362 Ignore wtimeout when timeoutMS or timeout() is configured (#1013)
Apply client timeoutMS to gridfs operations.
1 parent db3f2dc commit 935f926

12 files changed

+133
-87
lines changed

gridfs/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
_clear_entity_type_registry,
3434
_disallow_transactions,
3535
)
36-
from pymongo import ASCENDING, DESCENDING
36+
from pymongo import ASCENDING, DESCENDING, _csot
3737
from pymongo.client_session import ClientSession
3838
from pymongo.collection import Collection
3939
from pymongo.common import validate_string
@@ -514,6 +514,7 @@ def __init__(
514514
)
515515

516516
self._chunk_size_bytes = chunk_size_bytes
517+
self._timeout = db.client.options.timeout
517518

518519
def open_upload_stream(
519520
self,
@@ -631,6 +632,7 @@ def open_upload_stream_with_id(
631632

632633
return GridIn(self._collection, session=session, **opts)
633634

635+
@_csot.apply
634636
def upload_from_stream(
635637
self,
636638
filename: str,
@@ -679,6 +681,7 @@ def upload_from_stream(
679681

680682
return cast(ObjectId, gin._id)
681683

684+
@_csot.apply
682685
def upload_from_stream_with_id(
683686
self,
684687
file_id: Any,
@@ -762,6 +765,7 @@ def open_download_stream(
762765
gout._ensure_file()
763766
return gout
764767

768+
@_csot.apply
765769
def download_to_stream(
766770
self, file_id: Any, destination: Any, session: Optional[ClientSession] = None
767771
) -> None:
@@ -795,6 +799,7 @@ def download_to_stream(
795799
for chunk in gout:
796800
destination.write(chunk)
797801

802+
@_csot.apply
798803
def delete(self, file_id: Any, session: Optional[ClientSession] = None) -> None:
799804
"""Given an file_id, delete this stored file's files collection document
800805
and associated chunks from a GridFS bucket.
@@ -926,6 +931,7 @@ def open_download_stream_by_name(
926931
except StopIteration:
927932
raise NoFile("no version %d for filename %r" % (revision, filename))
928933

934+
@_csot.apply
929935
def download_to_stream_by_name(
930936
self,
931937
filename: str,

pymongo/_csot.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import functools
1818
import time
1919
from contextvars import ContextVar, Token
20-
from typing import Any, Callable, Optional, Tuple, TypeVar, cast
20+
from typing import Any, Callable, MutableMapping, Optional, Tuple, TypeVar, cast
21+
22+
from pymongo.write_concern import WriteConcern
2123

2224
TIMEOUT: ContextVar[Optional[float]] = ContextVar("TIMEOUT", default=None)
2325
RTT: ContextVar[float] = ContextVar("RTT", default=0.0)
@@ -103,3 +105,14 @@ def csot_wrapper(self, *args, **kwargs):
103105
return func(self, *args, **kwargs)
104106

105107
return cast(F, csot_wrapper)
108+
109+
110+
def apply_write_concern(cmd: MutableMapping, write_concern: Optional[WriteConcern]) -> None:
111+
"""Apply the given write concern to a command."""
112+
if not write_concern or write_concern.is_server_default:
113+
return
114+
wc = write_concern.document
115+
if get_timeout() is not None:
116+
wc.pop("wtimeout", None)
117+
if wc:
118+
cmd["writeConcern"] = wc

pymongo/bulk.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from bson.objectid import ObjectId
2424
from bson.raw_bson import RawBSONDocument
2525
from bson.son import SON
26-
from pymongo import common
26+
from pymongo import _csot, common
2727
from pymongo.client_session import _validate_session_write_concern
2828
from pymongo.collation import validate_collation_or_none
2929
from pymongo.common import (
@@ -315,8 +315,7 @@ def _execute_command(
315315
cmd = SON([(cmd_name, self.collection.name), ("ordered", self.ordered)])
316316
if self.comment:
317317
cmd["comment"] = self.comment
318-
if not write_concern.is_server_default:
319-
cmd["writeConcern"] = write_concern.document
318+
_csot.apply_write_concern(cmd, write_concern)
320319
if self.bypass_doc_val:
321320
cmd["bypassDocumentValidation"] = True
322321
if self.let is not None and run.op_type in (_DELETE, _UPDATE):

pymongo/collection.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -542,8 +542,6 @@ def _insert_one(
542542
command = SON([("insert", self.name), ("ordered", ordered), ("documents", [doc])])
543543
if comment is not None:
544544
command["comment"] = comment
545-
if not write_concern.is_server_default:
546-
command["writeConcern"] = write_concern.document
547545

548546
def _insert_command(session, sock_info, retryable_write):
549547
if bypass_doc_val:
@@ -756,8 +754,6 @@ def _update(
756754
if let is not None:
757755
common.validate_is_mapping("let", let)
758756
command["let"] = let
759-
if not write_concern.is_server_default:
760-
command["writeConcern"] = write_concern.document
761757

762758
if comment is not None:
763759
command["comment"] = comment
@@ -1232,8 +1228,6 @@ def _delete(
12321228
hint = helpers._index_document(hint)
12331229
delete_doc["hint"] = hint
12341230
command = SON([("delete", self.name), ("ordered", ordered), ("deletes", [delete_doc])])
1235-
if not write_concern.is_server_default:
1236-
command["writeConcern"] = write_concern.document
12371231

12381232
if let is not None:
12391233
common.validate_is_document_type("let", let)
@@ -2820,8 +2814,6 @@ def _find_and_modify(session, sock_info, retryable_write):
28202814
"Must be connected to MongoDB 4.4+ to use hint on unacknowledged find and modify commands."
28212815
)
28222816
cmd["hint"] = hint
2823-
if not write_concern.is_server_default:
2824-
cmd["writeConcern"] = write_concern.document
28252817
out = self._command(
28262818
sock_info,
28272819
cmd,

pymongo/network.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,8 @@ def command(
118118

119119
# Support CSOT
120120
if client:
121-
sock_info.apply_timeout(client, spec, write_concern)
122-
elif write_concern and not write_concern.is_server_default:
123-
spec["writeConcern"] = write_concern.document
121+
sock_info.apply_timeout(client, spec)
122+
_csot.apply_write_concern(spec, write_concern)
124123

125124
if use_op_msg:
126125
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0

pymongo/pool.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -569,16 +569,13 @@ def set_socket_timeout(self, timeout):
569569
self.last_timeout = timeout
570570
self.sock.settimeout(timeout)
571571

572-
def apply_timeout(self, client, cmd, write_concern=None):
572+
def apply_timeout(self, client, cmd):
573573
# CSOT: use remaining timeout when set.
574574
timeout = _csot.remaining()
575575
if timeout is None:
576576
# Reset the socket timeout unless we're performing a streaming monitor check.
577577
if not self.more_to_come:
578578
self.set_socket_timeout(self.opts.socket_timeout)
579-
580-
if cmd and write_concern and not write_concern.is_server_default:
581-
cmd["writeConcern"] = write_concern.document
582579
return None
583580
# RTT validation.
584581
rtt = _csot.get_rtt()
@@ -593,10 +590,6 @@ def apply_timeout(self, client, cmd, write_concern=None):
593590
)
594591
if cmd is not None:
595592
cmd["maxTimeMS"] = int(max_time_ms * 1000)
596-
wc = write_concern.document if write_concern else {}
597-
wc.pop("wtimeout", None)
598-
if wc:
599-
cmd["writeConcern"] = wc
600593
self.set_socket_timeout(timeout)
601594
return timeout
602595

test/csot/gridfs-advanced.json

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"client": {
1818
"id": "client",
1919
"uriOptions": {
20-
"timeoutMS": 50
20+
"timeoutMS": 75
2121
},
2222
"useMultipleMongoses": false,
2323
"observeEvents": [
@@ -62,13 +62,12 @@
6262
"_id": {
6363
"$oid": "000000000000000000000005"
6464
},
65-
"length": 10,
65+
"length": 8,
6666
"chunkSize": 4,
6767
"uploadDate": {
6868
"$date": "1970-01-01T00:00:00.000Z"
6969
},
70-
"md5": "57d83cd477bfb1ccd975ab33d827a92b",
71-
"filename": "length-10",
70+
"filename": "length-8",
7271
"contentType": "application/octet-stream",
7372
"aliases": [],
7473
"metadata": {}
@@ -93,6 +92,21 @@
9392
"subType": "00"
9493
}
9594
}
95+
},
96+
{
97+
"_id": {
98+
"$oid": "000000000000000000000006"
99+
},
100+
"files_id": {
101+
"$oid": "000000000000000000000005"
102+
},
103+
"n": 1,
104+
"data": {
105+
"$binary": {
106+
"base64": "ESIzRA==",
107+
"subType": "00"
108+
}
109+
}
96110
}
97111
]
98112
}
@@ -116,7 +130,7 @@
116130
"update"
117131
],
118132
"blockConnection": true,
119-
"blockTimeMS": 55
133+
"blockTimeMS": 100
120134
}
121135
}
122136
}
@@ -129,7 +143,7 @@
129143
"$oid": "000000000000000000000005"
130144
},
131145
"newFilename": "foo",
132-
"timeoutMS": 100
146+
"timeoutMS": 2000
133147
}
134148
}
135149
],
@@ -174,7 +188,7 @@
174188
"update"
175189
],
176190
"blockConnection": true,
177-
"blockTimeMS": 55
191+
"blockTimeMS": 100
178192
}
179193
}
180194
}
@@ -234,7 +248,7 @@
234248
"drop"
235249
],
236250
"blockConnection": true,
237-
"blockTimeMS": 55
251+
"blockTimeMS": 100
238252
}
239253
}
240254
}
@@ -243,7 +257,7 @@
243257
"name": "drop",
244258
"object": "bucket",
245259
"arguments": {
246-
"timeoutMS": 100
260+
"timeoutMS": 2000
247261
}
248262
}
249263
]
@@ -266,7 +280,7 @@
266280
"drop"
267281
],
268282
"blockConnection": true,
269-
"blockTimeMS": 55
283+
"blockTimeMS": 100
270284
}
271285
}
272286
}
@@ -320,7 +334,7 @@
320334
"drop"
321335
],
322336
"blockConnection": true,
323-
"blockTimeMS": 55
337+
"blockTimeMS": 100
324338
}
325339
}
326340
}

test/csot/gridfs-delete.json

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"client": {
1818
"id": "client",
1919
"uriOptions": {
20-
"timeoutMS": 50
20+
"timeoutMS": 75
2121
},
2222
"useMultipleMongoses": false,
2323
"observeEvents": [
@@ -62,13 +62,12 @@
6262
"_id": {
6363
"$oid": "000000000000000000000005"
6464
},
65-
"length": 10,
65+
"length": 8,
6666
"chunkSize": 4,
6767
"uploadDate": {
6868
"$date": "1970-01-01T00:00:00.000Z"
6969
},
70-
"md5": "57d83cd477bfb1ccd975ab33d827a92b",
71-
"filename": "length-10",
70+
"filename": "length-8",
7271
"contentType": "application/octet-stream",
7372
"aliases": [],
7473
"metadata": {}
@@ -93,6 +92,21 @@
9392
"subType": "00"
9493
}
9594
}
95+
},
96+
{
97+
"_id": {
98+
"$oid": "000000000000000000000006"
99+
},
100+
"files_id": {
101+
"$oid": "000000000000000000000005"
102+
},
103+
"n": 1,
104+
"data": {
105+
"$binary": {
106+
"base64": "ESIzRA==",
107+
"subType": "00"
108+
}
109+
}
96110
}
97111
]
98112
}
@@ -116,7 +130,7 @@
116130
"delete"
117131
],
118132
"blockConnection": true,
119-
"blockTimeMS": 55
133+
"blockTimeMS": 100
120134
}
121135
}
122136
}
@@ -128,7 +142,7 @@
128142
"id": {
129143
"$oid": "000000000000000000000005"
130144
},
131-
"timeoutMS": 100
145+
"timeoutMS": 1000
132146
}
133147
}
134148
]
@@ -151,7 +165,7 @@
151165
"delete"
152166
],
153167
"blockConnection": true,
154-
"blockTimeMS": 55
168+
"blockTimeMS": 100
155169
}
156170
}
157171
}
@@ -210,7 +224,7 @@
210224
"delete"
211225
],
212226
"blockConnection": true,
213-
"blockTimeMS": 55
227+
"blockTimeMS": 100
214228
}
215229
}
216230
}
@@ -247,7 +261,7 @@
247261
"delete"
248262
],
249263
"blockConnection": true,
250-
"blockTimeMS": 30
264+
"blockTimeMS": 50
251265
}
252266
}
253267
}

0 commit comments

Comments
 (0)