@@ -91,6 +91,7 @@ class IntersectService(IntersectEventObserver):
91
91
- shutdown()
92
92
- add_shutdown_messages()
93
93
- is_connected()
94
+ - considered_unrecoverable()
94
95
- forbid_keys()
95
96
- allow_keys()
96
97
- allow_all_functions()
@@ -218,13 +219,13 @@ def __init__(
218
219
self ._external_request_ctr = 0
219
220
220
221
self ._startup_messages : list [
221
- tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE ]
222
+ tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE | None ]
222
223
] = []
223
224
self ._resend_startup_messages = True
224
225
self ._sent_startup_messages = False
225
226
226
227
self ._shutdown_messages : list [
227
- tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE ]
228
+ tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE | None ]
228
229
] = []
229
230
230
231
self ._data_plane_manager = DataPlaneManager (self ._hierarchy , config .data_stores )
@@ -443,7 +444,7 @@ def get_blocked_keys(self) -> set[str]:
443
444
return self ._function_keys .copy ()
444
445
445
446
def add_startup_messages (
446
- self , messages : list [tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE ]]
447
+ self , messages : list [tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE | None ]]
447
448
) -> None :
448
449
"""Add request messages to send out to various microservices when this service starts.
449
450
@@ -454,7 +455,7 @@ def add_startup_messages(
454
455
self ._startup_messages .extend (messages )
455
456
456
457
def add_shutdown_messages (
457
- self , messages : list [tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE ]]
458
+ self , messages : list [tuple [IntersectClientMessageParams , RESPONSE_CALLBACK_TYPE | None ]]
458
459
) -> None :
459
460
"""Add request messages to send out to various microservices on shutdown.
460
461
@@ -519,8 +520,6 @@ def _process_external_requests(self) -> None:
519
520
self ._external_requests_lock .release_lock ()
520
521
521
522
def _process_external_request (self , extreq : IntersectService ._ExternalRequest ) -> None :
522
- if extreq .request is None :
523
- return
524
523
response = None
525
524
cleanup_req = False
526
525
@@ -707,12 +706,7 @@ def _handle_client_message(self, message: UserspaceMessage) -> None:
707
706
708
707
def _send_client_message (self , request_id : UUID , params : IntersectClientMessageParams ) -> bool :
709
708
"""Send a userspace message."""
710
- # ONE: VALIDATE AND SERIALIZE FUNCTION RESULTS
711
- try :
712
- params = IntersectClientMessageParams .model_validate (params )
713
- except ValidationError as e :
714
- logger .error (f'Invalid message parameters:\n { e } ' )
715
- return False
709
+ # "params" should already be validated at this stage.
716
710
request = GENERIC_MESSAGE_SERIALIZER .dump_json (params .payload , warnings = False )
717
711
718
712
# TWO: SEND DATA TO APPROPRIATE DATA STORE
0 commit comments