Skip to content

Commit a91578d

Browse files
author
mingsing
committed
aio dapr health
Signed-off-by: mingsing <[email protected]>
1 parent ef96101 commit a91578d

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

dapr/aio/clients/grpc/subscription.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from grpc.aio import AioRpcError
44

55
from dapr.clients.grpc._response import TopicEventResponse
6-
from dapr.clients.health import DaprHealth
6+
from dapr.aio.clients.health import DaprHealth
77
from dapr.common.pubsub.subscription import (
88
StreamInactiveError,
99
SubscriptionMessage,
@@ -51,8 +51,7 @@ async def outgoing_request_iterator():
5151

5252
async def reconnect_stream(self):
5353
await self.close()
54-
loop = asyncio.get_event_loop()
55-
await loop.run_in_executor(None, DaprHealth.wait_until_ready)
54+
await DaprHealth.wait_until_ready()
5655
print('Attempting to reconnect...')
5756
await self.start()
5857

dapr/aio/clients/health.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2024 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
import asyncio
16+
import urllib.request
17+
import urllib.error
18+
import time
19+
20+
from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT
21+
from dapr.clients.http.helpers import get_api_url
22+
from dapr.conf import settings
23+
24+
25+
class DaprHealth:
26+
@staticmethod
27+
async def wait_until_ready():
28+
health_url = f'{get_api_url()}/healthz/outbound'
29+
headers = {USER_AGENT_HEADER: DAPR_USER_AGENT}
30+
if settings.DAPR_API_TOKEN is not None:
31+
headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
32+
timeout = float(settings.DAPR_HEALTH_TIMEOUT)
33+
34+
start = time.time()
35+
while True:
36+
try:
37+
req = urllib.request.Request(health_url, headers=headers)
38+
with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response:
39+
if 200 <= response.status < 300:
40+
break
41+
except urllib.error.URLError as e:
42+
print(f'Health check on {health_url} failed: {e.reason}')
43+
except Exception as e:
44+
print(f'Unexpected error during health check: {e}')
45+
46+
remaining = (start + timeout) - time.time()
47+
if remaining <= 0:
48+
raise TimeoutError(f'Dapr health check timed out, after {timeout}.')
49+
await asyncio.sleep(min(1.0, remaining))
50+
51+
@staticmethod
52+
def get_ssl_context():
53+
# This method is used (overwritten) from tests
54+
# to return context for self-signed certificates
55+
return None

0 commit comments

Comments
 (0)