@@ -125,6 +125,8 @@ def __init__(
125
125
self .got_valid_response : bool = False
126
126
self .response_fn = response_handler
127
127
self .waiting : bool = False
128
+ self .cleanup_req = False
129
+ """When this flag is set to True, mark this request for GC deletion."""
128
130
129
131
def __init__ (
130
132
self ,
@@ -507,14 +509,6 @@ def create_external_request(
507
509
self ._external_requests_lock .release_lock ()
508
510
return request_uuid
509
511
510
- def _delete_external_request (self , req_id : UUID ) -> None :
511
- req_id_str = str (req_id )
512
- if req_id_str in self ._external_requests :
513
- self ._external_requests_lock .acquire_lock (blocking = True )
514
- req : IntersectService ._ExternalRequest = self ._external_requests .pop (req_id_str )
515
- del req
516
- self ._external_requests_lock .release_lock ()
517
-
518
512
def _get_external_request (self , req_id : UUID ) -> IntersectService ._ExternalRequest | None :
519
513
req_id_str = str (req_id )
520
514
if req_id_str in self ._external_requests :
@@ -524,14 +518,25 @@ def _get_external_request(self, req_id: UUID) -> IntersectService._ExternalReque
524
518
525
519
def _process_external_requests (self ) -> None :
526
520
self ._external_requests_lock .acquire_lock (blocking = True )
521
+
522
+ # process requests
527
523
for extreq in self ._external_requests .values ():
528
524
if not extreq .processed :
529
525
self ._process_external_request (extreq )
526
+ # delete requests
527
+ cleanup_list = [
528
+ str (extreq .request_id )
529
+ for extreq in self ._external_requests .values ()
530
+ if extreq .cleanup_req
531
+ ]
532
+ for extreq_id in cleanup_list :
533
+ extreq = self ._external_requests .pop (extreq_id )
534
+ del extreq
535
+
530
536
self ._external_requests_lock .release_lock ()
531
537
532
538
def _process_external_request (self , extreq : IntersectService ._ExternalRequest ) -> None :
533
539
response = None
534
- cleanup_req = False
535
540
536
541
now = datetime .now (timezone .utc )
537
542
logger .debug (f'Processing external request { extreq .request_id } @ { now } ' )
@@ -555,7 +560,7 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) -
555
560
logger .warning (
556
561
f'External service request encountered an error: { error_msg } '
557
562
)
558
- cleanup_req = True
563
+ extreq . cleanup_req = True
559
564
else :
560
565
logger .debug ('Request wait timed-out!' )
561
566
extreq .waiting = False
@@ -570,9 +575,6 @@ def _process_external_request(self, extreq: IntersectService._ExternalRequest) -
570
575
):
571
576
extreq .response_fn (response )
572
577
573
- if cleanup_req :
574
- self ._delete_external_request (extreq .request_id )
575
-
576
578
def _handle_service_message_raw (self , raw : bytes ) -> None :
577
579
"""Main broker callback function.
578
580
0 commit comments