Skip to content

Commit 30a2a2e

Browse files
authored
Feature: ipv4 and missing service (#221)
* Feature: base client service * Feature: CRN service for client * Feature: DNS service for client * Feature: Scheduler service for client * Feature: Port_forwarder service for client * Feature: new settings `DNS_API` `CRN_URL_UPDATE` `CRN_LIST_URL` `CRN_VERSION` `SCHEDULER_URL` * feature: new Exception for custom service * fix: parse_obj is deprecated need to use model_validate * feat: utils func sanitize_url * feature: Utils client service * fix: lint issue http_port_forwarder.py * feature: Services types * Feature: AlephHttpClient load default service and allow new method `register_service` to add any service on top * Feature: AuthenticatedAlephHttpClient load default service and allow new method `register_authenticated_service` to add any service on top * feat: new unit test for client services * fix: import * fix: domains service not existing yet * fix: unit test * fix: remove domains for units test service * refactor: No __init__ needed * Refactor: renaming class / change folder struct * fix: port forwarder import * fix: linting format import * Feature: client.dns.get_public_dns_by_host * fix: this functions not used / wrong place * fix: linting FMT issue * feat: use new filter on dns api ?item_hash= * fix: get_scheduler_node become get_nodes since we already on client.scheduler * fix: rename get_ports to get_address_ports, get_port to get_ports * fix: we should also ensure that the mlessage is not being removed when getting instance allocations * fix: new unit test, some name change * fix: remove unit test for now will fix them
1 parent cb15f7e commit 30a2a2e

20 files changed

+1822
-7
lines changed

src/aleph/sdk/chains/remote.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async def from_crypto_host(
5252
session = aiohttp.ClientSession(connector=connector)
5353

5454
async with session.get(f"{host}/properties") as response:
55-
await response.raise_for_status()
55+
response.raise_for_status()
5656
data = await response.json()
5757
properties = AccountProperties(**data)
5858

@@ -75,7 +75,7 @@ def private_key(self):
7575
async def sign_message(self, message: Dict) -> Dict:
7676
"""Sign a message inplace."""
7777
async with self._session.post(f"{self._host}/sign", json=message) as response:
78-
await response.raise_for_status()
78+
response.raise_for_status()
7979
return await response.json()
8080

8181
async def sign_raw(self, buffer: bytes) -> bytes:

src/aleph/sdk/client/authenticated_http.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from ..utils import extended_json_encoder, make_instance_content, make_program_content
3939
from .abstract import AuthenticatedAlephClient
4040
from .http import AlephHttpClient
41+
from .services.authenticated_port_forwarder import AuthenticatedPortForwarder
4142

4243
logger = logging.getLogger(__name__)
4344

@@ -81,6 +82,13 @@ def __init__(
8182
)
8283
self.account = account
8384

85+
async def __aenter__(self):
86+
await super().__aenter__()
87+
# Override services with authenticated versions
88+
self.port_forwarder = AuthenticatedPortForwarder(self)
89+
90+
return self
91+
8492
async def ipfs_push(self, content: Mapping) -> str:
8593
"""
8694
Push arbitrary content as JSON to the IPFS service.
@@ -392,7 +400,7 @@ async def create_store(
392400
if extra_fields is not None:
393401
values.update(extra_fields)
394402

395-
content = StoreContent.parse_obj(values)
403+
content = StoreContent.model_validate(values)
396404

397405
message, status, _ = await self.submit(
398406
content=content.model_dump(exclude_none=True),

src/aleph/sdk/client/http.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@
3333
from aleph_message.status import MessageStatus
3434
from pydantic import ValidationError
3535

36+
from aleph.sdk.client.services.crn import Crn
37+
from aleph.sdk.client.services.dns import DNS
38+
from aleph.sdk.client.services.instance import Instance
39+
from aleph.sdk.client.services.port_forwarder import PortForwarder
40+
from aleph.sdk.client.services.scheduler import Scheduler
41+
3642
from ..conf import settings
3743
from ..exceptions import (
3844
FileTooLarge,
@@ -123,6 +129,13 @@ async def __aenter__(self):
123129
)
124130
)
125131

132+
# Initialize default services
133+
self.dns = DNS(self)
134+
self.port_forwarder = PortForwarder(self)
135+
self.crn = Crn(self)
136+
self.scheduler = Scheduler(self)
137+
self.instance = Instance(self)
138+
126139
return self
127140

128141
async def __aexit__(self, exc_type, exc_val, exc_tb):
@@ -139,7 +152,8 @@ async def fetch_aggregate(self, address: str, key: str) -> Dict[str, Dict]:
139152
resp.raise_for_status()
140153
result = await resp.json()
141154
data = result.get("data", dict())
142-
return data.get(key)
155+
final_result = data.get(key)
156+
return final_result
143157

144158
async def fetch_aggregates(
145159
self, address: str, keys: Optional[Iterable[str]] = None

src/aleph/sdk/client/services/__init__.py

Whitespace-only changes.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
from typing import TYPE_CHECKING, Optional, Tuple
2+
3+
from aleph_message.models import AggregateMessage, ItemHash
4+
from aleph_message.status import MessageStatus
5+
6+
from aleph.sdk.client.services.base import AggregateConfig
7+
from aleph.sdk.client.services.port_forwarder import PortForwarder
8+
from aleph.sdk.exceptions import MessageNotProcessed, NotAuthorize
9+
from aleph.sdk.types import AllForwarders, Ports
10+
from aleph.sdk.utils import safe_getattr
11+
12+
if TYPE_CHECKING:
13+
from aleph.sdk.client.abstract import AuthenticatedAlephClient
14+
15+
16+
class AuthenticatedPortForwarder(PortForwarder):
17+
"""
18+
Authenticated Port Forwarder services with create and update capabilities
19+
"""
20+
21+
def __init__(self, client: "AuthenticatedAlephClient"):
22+
super().__init__(client)
23+
24+
async def _verify_status_processed_and_ownership(
25+
self, item_hash: ItemHash
26+
) -> Tuple[AggregateMessage, MessageStatus]:
27+
"""
28+
Verify that the message is well processed (and not rejected / pending),
29+
This also verify the ownership of the message
30+
"""
31+
message: AggregateMessage
32+
status: MessageStatus
33+
message, status = await self._client.get_message(
34+
item_hash=item_hash,
35+
with_status=True,
36+
)
37+
38+
# We ensure message is not Rejected (Might not be processed yet)
39+
if status not in [MessageStatus.PROCESSED, MessageStatus.PENDING]:
40+
raise MessageNotProcessed(item_hash=item_hash, status=status)
41+
42+
message_content = safe_getattr(message, "content")
43+
address = safe_getattr(message_content, "address")
44+
45+
if (
46+
not hasattr(self._client, "account")
47+
or address != self._client.account.get_address()
48+
):
49+
current_address = (
50+
self._client.account.get_address()
51+
if hasattr(self._client, "account")
52+
else "unknown"
53+
)
54+
raise NotAuthorize(
55+
item_hash=item_hash,
56+
target_address=address,
57+
current_address=current_address,
58+
)
59+
return message, status
60+
61+
async def get_address_ports(
62+
self, address: Optional[str] = None
63+
) -> AggregateConfig[AllForwarders]:
64+
"""
65+
Get all port forwarding configurations for an address
66+
67+
Args:
68+
address: The address to fetch configurations for.
69+
If None, uses the authenticated client's account address.
70+
71+
Returns:
72+
Port forwarding configurations
73+
"""
74+
if address is None:
75+
if not hasattr(self._client, "account") or not self._client.account:
76+
raise ValueError("No account provided and client is not authenticated")
77+
address = self._client.account.get_address()
78+
79+
return await super().get_address_ports(address=address)
80+
81+
async def get_ports(
82+
self, item_hash: ItemHash = None, address: Optional[str] = None
83+
) -> Optional[Ports]:
84+
"""
85+
Get port forwarding configuration for a specific item hash
86+
87+
Args:
88+
address: The address to fetch configurations for.
89+
If None, uses the authenticated client's account address.
90+
item_hash: The hash of the item to get configuration for
91+
92+
Returns:
93+
Port configuration if found, otherwise empty Ports object
94+
"""
95+
if address is None:
96+
if not hasattr(self._client, "account") or not self._client.account:
97+
raise ValueError("No account provided and client is not authenticated")
98+
address = self._client.account.get_address()
99+
100+
if item_hash is None:
101+
raise ValueError("item_hash must be provided")
102+
103+
return await super().get_ports(address=address, item_hash=item_hash)
104+
105+
async def create_ports(
106+
self, item_hash: ItemHash, ports: Ports
107+
) -> Tuple[AggregateMessage, MessageStatus]:
108+
"""
109+
Create a new port forwarding configuration for an item hash
110+
111+
Args:
112+
item_hash: The hash of the item (instance/program/IPFS website)
113+
ports: Dictionary mapping port numbers to PortFlags
114+
115+
Returns:
116+
Dictionary with the result of the operation
117+
"""
118+
if not hasattr(self._client, "account") or not self._client.account:
119+
raise ValueError("An account is required for this operation")
120+
121+
# Pre Check
122+
# _, _ = await self._verify_status_processed_and_ownership(item_hash=item_hash)
123+
124+
content = {str(item_hash): ports.model_dump()}
125+
126+
# Check if create_aggregate exists on the client
127+
return await self._client.create_aggregate( # type: ignore
128+
key=self.aggregate_key, content=content
129+
)
130+
131+
async def update_ports(
132+
self, item_hash: ItemHash, ports: Ports
133+
) -> Tuple[AggregateMessage, MessageStatus]:
134+
"""
135+
Update an existing port forwarding configuration for an item hash
136+
137+
Args:
138+
item_hash: The hash of the item (instance/program/IPFS website)
139+
ports: Dictionary mapping port numbers to PortFlags
140+
141+
Returns:
142+
Dictionary with the result of the operation
143+
"""
144+
if not hasattr(self._client, "account") or not self._client.account:
145+
raise ValueError("An account is required for this operation")
146+
147+
# Pre Check
148+
# _, _ = await self._verify_status_processed_and_ownership(item_hash=item_hash)
149+
150+
content = {}
151+
152+
content[str(item_hash)] = ports.model_dump()
153+
154+
message, status = await self._client.create_aggregate( # type: ignore
155+
key=self.aggregate_key, content=content
156+
)
157+
158+
return message, status
159+
160+
async def delete_ports(
161+
self, item_hash: ItemHash
162+
) -> Tuple[AggregateMessage, MessageStatus]:
163+
"""
164+
Delete port forwarding configuration for an item hash
165+
166+
Args:
167+
item_hash: The hash of the item (instance/program/IPFS website) to delete configuration for
168+
169+
Returns:
170+
Dictionary with the result of the operation
171+
"""
172+
if not hasattr(self._client, "account") or not self._client.account:
173+
raise ValueError("An account is required for this operation")
174+
175+
# Pre Check
176+
# _, _ = await self._verify_status_processed_and_ownership(item_hash=item_hash)
177+
178+
# Get the Port Config of the item_hash
179+
port: Optional[Ports] = await self.get_ports(item_hash=item_hash)
180+
if not port:
181+
raise
182+
183+
content = {}
184+
content[str(item_hash)] = port.model_dump()
185+
186+
# Create a new aggregate with the updated content
187+
message, status = await self._client.create_aggregate( # type: ignore
188+
key=self.aggregate_key, content=content
189+
)
190+
return message, status

src/aleph/sdk/client/services/base.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from abc import ABC
2+
from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar
3+
4+
from pydantic import BaseModel
5+
6+
if TYPE_CHECKING:
7+
from aleph.sdk.client.http import AlephHttpClient
8+
9+
10+
T = TypeVar("T", bound=BaseModel)
11+
12+
13+
class AggregateConfig(BaseModel, Generic[T]):
14+
"""
15+
A generic container for "aggregate" data of type T.
16+
- `data` will be either None or a list of T-instances.
17+
"""
18+
19+
data: Optional[List[T]] = None
20+
21+
22+
class BaseService(ABC, Generic[T]):
23+
aggregate_key: str
24+
model_cls: Type[T]
25+
26+
def __init__(self, client: "AlephHttpClient"):
27+
self._client = client
28+
self.model_cls: Type[T]
29+
30+
async def get_config(self, address: str):
31+
32+
aggregate_data = await self._client.fetch_aggregate(
33+
address=address, key=self.aggregate_key
34+
)
35+
36+
if aggregate_data:
37+
model_instance = self.model_cls.model_validate(aggregate_data)
38+
config = AggregateConfig[T](data=[model_instance])
39+
else:
40+
config = AggregateConfig[T](data=None)
41+
42+
return config

0 commit comments

Comments
 (0)