-
Notifications
You must be signed in to change notification settings - Fork 139
Wait for the Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking, ensuring the asyncio gRPC stream can close properly. #839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
dded008
ef96101
a91578d
afe00ee
d9cfc36
dde4988
453afd1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| # -*- coding: utf-8 -*- | ||
|
|
||
| """ | ||
| Copyright 2024 The Dapr Authors | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| """ | ||
| import asyncio | ||
| import urllib.request | ||
| import urllib.error | ||
| import time | ||
|
|
||
| from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT | ||
| from dapr.clients.http.helpers import get_api_url | ||
| from dapr.conf import settings | ||
|
|
||
|
|
||
| class DaprHealth: | ||
| @staticmethod | ||
| async def wait_until_ready(): | ||
| health_url = f'{get_api_url()}/healthz/outbound' | ||
| headers = {USER_AGENT_HEADER: DAPR_USER_AGENT} | ||
| if settings.DAPR_API_TOKEN is not None: | ||
| headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN | ||
| timeout = float(settings.DAPR_HEALTH_TIMEOUT) | ||
|
|
||
| start = time.time() | ||
| while True: | ||
| try: | ||
| req = urllib.request.Request(health_url, headers=headers) | ||
| with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be better to use an async http lib like |
||
| if 200 <= response.status < 300: | ||
| break | ||
| except urllib.error.URLError as e: | ||
| print(f'Health check on {health_url} failed: {e.reason}') | ||
| except Exception as e: | ||
| print(f'Unexpected error during health check: {e}') | ||
|
|
||
| remaining = (start + timeout) - time.time() | ||
| if remaining <= 0: | ||
| raise TimeoutError(f'Dapr health check timed out, after {timeout}.') | ||
| await asyncio.sleep(min(1.0, remaining)) | ||
|
|
||
| @staticmethod | ||
| def get_ssl_context(): | ||
| # This method is used (overwritten) from tests | ||
| # to return context for self-signed certificates | ||
| return None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,14 +18,16 @@ | |
| from unittest.mock import patch | ||
|
|
||
| from dapr.aio.clients.grpc.client import DaprGrpcClientAsync | ||
| from dapr.clients.health import DaprHealth | ||
| from dapr.aio.clients.health import DaprHealth as DaprHealthAsync | ||
| from dapr.clients.health import DaprHealth as DaprHealth | ||
| from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context | ||
| from tests.clients.test_dapr_grpc_client_async import DaprGrpcClientAsyncTests | ||
| from .fake_dapr_server import FakeDaprSidecar | ||
| from dapr.conf import settings | ||
|
|
||
|
|
||
| DaprGrpcClientAsync.get_credentials = replacement_get_credentials_func | ||
| DaprHealthAsync.get_ssl_context = replacement_get_health_context | ||
| DaprHealth.get_ssl_context = replacement_get_health_context | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we still need the sync version?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
See dapr/clients/grpc/subscription.py: the init function of DaprGrpcClientAsync invokes the synchronous |
||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| # -*- coding: utf-8 -*- | ||
|
|
||
| """ | ||
| Copyright 2025 The Dapr Authors | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| """ | ||
| import asyncio | ||
| import time | ||
| import unittest | ||
| from unittest.mock import patch, MagicMock | ||
|
|
||
| from dapr.aio.clients.health import DaprHealth | ||
| from dapr.conf import settings | ||
| from dapr.version import __version__ | ||
|
|
||
|
|
||
| class DaprHealthCheckAsyncTests(unittest.IsolatedAsyncioTestCase): | ||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_wait_until_ready_success(self, mock_urlopen): | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) | ||
|
|
||
| try: | ||
| await DaprHealth.wait_until_ready() | ||
| except Exception as e: | ||
| self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') | ||
|
|
||
| mock_urlopen.assert_called_once() | ||
|
|
||
| called_url = mock_urlopen.call_args[0][0].full_url | ||
| self.assertEqual(called_url, 'http://domain.com:3500/v1.0/healthz/outbound') | ||
|
|
||
| # Check headers are properly set | ||
| headers = mock_urlopen.call_args[0][0].headers | ||
| self.assertIn('User-agent', headers) | ||
| self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') | ||
|
|
||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch.object(settings, 'DAPR_API_TOKEN', 'mytoken') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_wait_until_ready_success_with_api_token(self, mock_urlopen): | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) | ||
|
|
||
| try: | ||
| await DaprHealth.wait_until_ready() | ||
| except Exception as e: | ||
| self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') | ||
|
|
||
| mock_urlopen.assert_called_once() | ||
|
|
||
| # Check headers are properly set | ||
| headers = mock_urlopen.call_args[0][0].headers | ||
| self.assertIn('User-agent', headers) | ||
| self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') | ||
| self.assertIn('Dapr-api-token', headers) | ||
| self.assertEqual(headers['Dapr-api-token'], 'mytoken') | ||
|
|
||
| @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '2.5') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_wait_until_ready_timeout(self, mock_urlopen): | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=500) | ||
|
|
||
| start = time.time() | ||
|
|
||
| with self.assertRaises(TimeoutError): | ||
| await DaprHealth.wait_until_ready() | ||
|
|
||
| self.assertGreaterEqual(time.time() - start, 2.5) | ||
| self.assertGreater(mock_urlopen.call_count, 1) | ||
|
|
||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '5.0') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_health_check_does_not_block(self, mock_urlopen): | ||
| """Test that health check doesn't block other async tasks from running""" | ||
| # Mock health check to retry several times before succeeding | ||
| call_count = [0] # Use list to allow modification in nested function | ||
|
|
||
| class MockResponse: | ||
| def __init__(self, status): | ||
| self.status = status | ||
|
|
||
| def __enter__(self): | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| return None | ||
|
|
||
| def side_effect(*args, **kwargs): | ||
| call_count[0] += 1 | ||
| # First 2 calls fail with URLError, then succeed | ||
| # This will cause ~2 seconds of retries (1 second sleep after each failure) | ||
| if call_count[0] <= 2: | ||
| import urllib.error | ||
|
|
||
| raise urllib.error.URLError('Connection refused') | ||
| else: | ||
| return MockResponse(status=200) | ||
|
|
||
| mock_urlopen.side_effect = side_effect | ||
|
|
||
| # Counter that will be incremented by background task | ||
| counter = [0] # Use list to allow modification in nested function | ||
| is_running = [True] | ||
|
|
||
| async def increment_counter(): | ||
| """Background task that increments counter every 0.5 seconds""" | ||
| while is_running[0]: | ||
| await asyncio.sleep(0.5) | ||
| counter[0] += 1 | ||
|
|
||
| # Start the background task | ||
| counter_task = asyncio.create_task(increment_counter()) | ||
|
|
||
| try: | ||
| # Run health check (will take ~2 seconds with retries) | ||
| await DaprHealth.wait_until_ready() | ||
|
|
||
| # Stop the background task | ||
| is_running[0] = False | ||
| await asyncio.sleep(0.1) # Give it time to finish current iteration | ||
|
|
||
| # Verify the counter was incremented during health check | ||
| # In 2 seconds with 0.5s intervals, we expect at least 3 increments | ||
| self.assertGreaterEqual( | ||
| counter[0], | ||
| 3, | ||
| f'Expected counter to increment at least 3 times during health check, ' | ||
| f'but got {counter[0]}. This indicates health check may be blocking.', | ||
| ) | ||
|
|
||
| # Verify health check made multiple attempts | ||
| self.assertGreaterEqual(call_count[0], 2) | ||
|
|
||
| finally: | ||
| # Clean up | ||
| is_running[0] = False | ||
| counter_task.cancel() | ||
| try: | ||
| await counter_task | ||
| except asyncio.CancelledError: | ||
| pass | ||
|
|
||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_multiple_health_checks_concurrent(self, mock_urlopen): | ||
| """Test that multiple health check calls can run concurrently""" | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) | ||
|
|
||
| # Run multiple health checks concurrently | ||
| start_time = time.time() | ||
| results = await asyncio.gather( | ||
| DaprHealth.wait_until_ready(), | ||
| DaprHealth.wait_until_ready(), | ||
| DaprHealth.wait_until_ready(), | ||
| ) | ||
| elapsed = time.time() - start_time | ||
|
|
||
| # All should complete successfully | ||
| self.assertEqual(len(results), 3) | ||
| self.assertIsNone(results[0]) | ||
| self.assertIsNone(results[1]) | ||
| self.assertIsNone(results[2]) | ||
|
|
||
| # Should complete quickly since they run concurrently | ||
| self.assertLess(elapsed, 1.0) | ||
|
|
||
| # Verify multiple calls were made | ||
| self.assertGreaterEqual(mock_urlopen.call_count, 3) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StatusCode.UNKNOWNshouldn't be retriable, no?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the exception code is not always recognized as StatusCode.UNAVAILABLE when Dapr shuts down immediately; instead, it may appear as StatusCode.UNKNOWN, as seen in the error message previously encountered:
gRPC error while reading from stream: api server closed, Status Code: StatusCode.UNKNOWN. Attempting to reconnect....The synchronous subscription implementation also retries upon encountering StatusCode.UNKNOWN, see: e7c85ce#diff-c7dbe5c0c85056e25bd5d884ad10147a7d12f4b8e9aac52513504a171404093e (dapr/clients/grpc/subscription.py)