-
Notifications
You must be signed in to change notification settings - Fork 139
Wait for the Dapr health check asynchronously in aio/clients/grpc/subscription.py to avoid blocking, ensuring the asyncio gRPC stream can close properly. #839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
StarJourneyMingsing
wants to merge
7
commits into
dapr:main
Choose a base branch
from
StarJourneyMingsing:fix/pubsub_re_sub
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+243
−5
Open
Changes from 4 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
dded008
wait for Dapr health check asynchronously
ef96101
add StatusCode.UNKNOWN branch
a91578d
aio dapr health
afe00ee
add healthcheck test
d9cfc36
ruff pass
dde4988
fix async health check
453afd1
Merge branch 'main' into fix/pubsub_re_sub
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| # -*- coding: utf-8 -*- | ||
|
|
||
| """ | ||
| Copyright 2024 The Dapr Authors | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| """ | ||
| import asyncio | ||
| import urllib.request | ||
| import urllib.error | ||
| import time | ||
|
|
||
| from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT | ||
| from dapr.clients.http.helpers import get_api_url | ||
| from dapr.conf import settings | ||
|
|
||
|
|
||
| class DaprHealth: | ||
| @staticmethod | ||
| async def wait_until_ready(): | ||
| health_url = f'{get_api_url()}/healthz/outbound' | ||
| headers = {USER_AGENT_HEADER: DAPR_USER_AGENT} | ||
| if settings.DAPR_API_TOKEN is not None: | ||
| headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN | ||
| timeout = float(settings.DAPR_HEALTH_TIMEOUT) | ||
|
|
||
| start = time.time() | ||
| while True: | ||
| try: | ||
| req = urllib.request.Request(health_url, headers=headers) | ||
| with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be better to use an async http lib like |
||
| if 200 <= response.status < 300: | ||
| break | ||
| except urllib.error.URLError as e: | ||
| print(f'Health check on {health_url} failed: {e.reason}') | ||
| except Exception as e: | ||
| print(f'Unexpected error during health check: {e}') | ||
|
|
||
| remaining = (start + timeout) - time.time() | ||
| if remaining <= 0: | ||
| raise TimeoutError(f'Dapr health check timed out, after {timeout}.') | ||
| await asyncio.sleep(min(1.0, remaining)) | ||
|
|
||
| @staticmethod | ||
| def get_ssl_context(): | ||
| # This method is used (overwritten) from tests | ||
| # to return context for self-signed certificates | ||
| return None | ||
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| # -*- coding: utf-8 -*- | ||
|
|
||
| """ | ||
| Copyright 2025 The Dapr Authors | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| """ | ||
| import asyncio | ||
| import time | ||
| import unittest | ||
| from unittest.mock import patch, MagicMock | ||
|
|
||
| from dapr.aio.clients.health import DaprHealth | ||
| from dapr.conf import settings | ||
| from dapr.version import __version__ | ||
|
|
||
|
|
||
| class DaprHealthCheckAsyncTests(unittest.IsolatedAsyncioTestCase): | ||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_wait_until_ready_success(self, mock_urlopen): | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) | ||
|
|
||
| try: | ||
| await DaprHealth.wait_until_ready() | ||
| except Exception as e: | ||
| self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') | ||
|
|
||
| mock_urlopen.assert_called_once() | ||
|
|
||
| called_url = mock_urlopen.call_args[0][0].full_url | ||
| self.assertEqual(called_url, 'http://domain.com:3500/v1.0/healthz/outbound') | ||
|
|
||
| # Check headers are properly set | ||
| headers = mock_urlopen.call_args[0][0].headers | ||
| self.assertIn('User-agent', headers) | ||
| self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') | ||
|
|
||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch.object(settings, 'DAPR_API_TOKEN', 'mytoken') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_wait_until_ready_success_with_api_token(self, mock_urlopen): | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) | ||
|
|
||
| try: | ||
| await DaprHealth.wait_until_ready() | ||
| except Exception as e: | ||
| self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') | ||
|
|
||
| mock_urlopen.assert_called_once() | ||
|
|
||
| # Check headers are properly set | ||
| headers = mock_urlopen.call_args[0][0].headers | ||
| self.assertIn('User-agent', headers) | ||
| self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') | ||
| self.assertIn('Dapr-api-token', headers) | ||
| self.assertEqual(headers['Dapr-api-token'], 'mytoken') | ||
|
|
||
| @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '2.5') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_wait_until_ready_timeout(self, mock_urlopen): | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=500) | ||
|
|
||
| start = time.time() | ||
|
|
||
| with self.assertRaises(TimeoutError): | ||
| await DaprHealth.wait_until_ready() | ||
|
|
||
| self.assertGreaterEqual(time.time() - start, 2.5) | ||
| self.assertGreater(mock_urlopen.call_count, 1) | ||
|
|
||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '5.0') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_health_check_does_not_block(self, mock_urlopen): | ||
| """Test that health check doesn't block other async tasks from running""" | ||
| # Mock health check to retry several times before succeeding | ||
| call_count = [0] # Use list to allow modification in nested function | ||
|
|
||
| class MockResponse: | ||
| def __init__(self, status): | ||
| self.status = status | ||
|
|
||
| def __enter__(self): | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| return None | ||
|
|
||
| def side_effect(*args, **kwargs): | ||
| call_count[0] += 1 | ||
| # First 2 calls fail with URLError, then succeed | ||
| # This will cause ~2 seconds of retries (1 second sleep after each failure) | ||
| if call_count[0] <= 2: | ||
| import urllib.error | ||
| raise urllib.error.URLError('Connection refused') | ||
| else: | ||
| return MockResponse(status=200) | ||
|
|
||
| mock_urlopen.side_effect = side_effect | ||
|
|
||
| # Counter that will be incremented by background task | ||
| counter = [0] # Use list to allow modification in nested function | ||
| is_running = [True] | ||
|
|
||
| async def increment_counter(): | ||
| """Background task that increments counter every 0.5 seconds""" | ||
| while is_running[0]: | ||
| await asyncio.sleep(0.5) | ||
| counter[0] += 1 | ||
|
|
||
| # Start the background task | ||
| counter_task = asyncio.create_task(increment_counter()) | ||
|
|
||
| try: | ||
| # Run health check (will take ~2 seconds with retries) | ||
| await DaprHealth.wait_until_ready() | ||
|
|
||
| # Stop the background task | ||
| is_running[0] = False | ||
| await asyncio.sleep(0.1) # Give it time to finish current iteration | ||
|
|
||
| # Verify the counter was incremented during health check | ||
| # In 2 seconds with 0.5s intervals, we expect at least 3 increments | ||
| self.assertGreaterEqual( | ||
| counter[0], | ||
| 3, | ||
| f'Expected counter to increment at least 3 times during health check, ' | ||
| f'but got {counter[0]}. This indicates health check may be blocking.', | ||
| ) | ||
|
|
||
| # Verify health check made multiple attempts | ||
| self.assertGreaterEqual(call_count[0], 2) | ||
|
|
||
| finally: | ||
| # Clean up | ||
| is_running[0] = False | ||
| counter_task.cancel() | ||
| try: | ||
| await counter_task | ||
| except asyncio.CancelledError: | ||
| pass | ||
|
|
||
| @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') | ||
| @patch('urllib.request.urlopen') | ||
| async def test_multiple_health_checks_concurrent(self, mock_urlopen): | ||
| """Test that multiple health check calls can run concurrently""" | ||
| mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) | ||
|
|
||
| # Run multiple health checks concurrently | ||
| start_time = time.time() | ||
| results = await asyncio.gather( | ||
| DaprHealth.wait_until_ready(), | ||
| DaprHealth.wait_until_ready(), | ||
| DaprHealth.wait_until_ready(), | ||
| ) | ||
| elapsed = time.time() - start_time | ||
|
|
||
| # All should complete successfully | ||
| self.assertEqual(len(results), 3) | ||
| self.assertIsNone(results[0]) | ||
| self.assertIsNone(results[1]) | ||
| self.assertIsNone(results[2]) | ||
|
|
||
| # Should complete quickly since they run concurrently | ||
| self.assertLess(elapsed, 1.0) | ||
|
|
||
| # Verify multiple calls were made | ||
| self.assertGreaterEqual(mock_urlopen.call_count, 3) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| unittest.main() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StatusCode.UNKNOWNshouldn't be retriable, no?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the exception code is not always recognized as StatusCode.UNAVAILABLE when Dapr shuts down immediately; instead, it may appear as StatusCode.UNKNOWN, as seen in the error message previously encountered:
gRPC error while reading from stream: api server closed, Status Code: StatusCode.UNKNOWN. Attempting to reconnect....The synchronous subscription implementation also retries upon encountering StatusCode.UNKNOWN, see: e7c85ce#diff-c7dbe5c0c85056e25bd5d884ad10147a7d12f4b8e9aac52513504a171404093e (dapr/clients/grpc/subscription.py)