Skip to content

Commit

Permalink
Merge pull request #299 from lidofinance/develop
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
hweawer authored Dec 16, 2024
2 parents d2268ba + f250853 commit 28d4093
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 16 deletions.
13 changes: 7 additions & 6 deletions src/blockchain/web3_extentions/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from typing import Any, Callable, Set, cast
from urllib.parse import urlparse

from metrics.metrics import ETH_RPC_REQUESTS, ETH_RPC_REQUESTS_DURATION
from metrics.metrics import ETH_RPC_REQUESTS_DURATION
from prometheus_client import Counter
from requests import HTTPError, Response
from web3 import Web3
from web3.middleware import construct_simple_cache_middleware
Expand All @@ -11,7 +12,7 @@
logger = logging.getLogger(__name__)


def add_requests_metric_middleware(web3: Web3) -> Web3:
def add_requests_metric_middleware(web3: Web3, rpc_metric: Counter) -> Web3:
"""
Works correctly with MultiProvider and vanilla Providers.
Expand All @@ -28,7 +29,7 @@ def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
response = make_request(method, params)
except HTTPError as ex:
failed: Response = ex.response
ETH_RPC_REQUESTS.labels(
rpc_metric.labels(
method=method,
code=failed.status_code,
domain=urlparse(web3.provider.endpoint_uri).netloc, # pyright: ignore
Expand All @@ -42,7 +43,7 @@ def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
if isinstance(error, dict):
code = error.get('code') or code

ETH_RPC_REQUESTS.labels(
rpc_metric.labels(
method=method,
code=code,
domain=urlparse(web3.provider.endpoint_uri).netloc, # pyright: ignore
Expand Down Expand Up @@ -70,12 +71,12 @@ def add_cache_middleware(web3: Web3) -> Web3:
return web3


def add_middlewares(web3: Web3) -> Web3:
def add_middlewares(web3: Web3, rpc_metric: Counter) -> Web3:
"""
Cache middleware should go first to avoid rewriting metrics for cached requests.
If middleware has level = 0, the middleware will be appended to the end of the middleware list.
So we need [..., cache, other middlewares]
"""
add_cache_middleware(web3)
add_requests_metric_middleware(web3)
add_requests_metric_middleware(web3, rpc_metric)
return web3
2 changes: 1 addition & 1 deletion src/bots/depositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(

self._onchain_transport_w3 = None
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
self._onchain_transport_w3 = OnchainTransportProvider.create_ochain_transport_w3()
self._onchain_transport_w3 = OnchainTransportProvider.create_onchain_transport_w3()
transports.append(
OnchainTransportProvider(
w3=self._onchain_transport_w3,
Expand Down
2 changes: 1 addition & 1 deletion src/bots/pauser.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, w3: Web3):
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
OnchainTransportProvider(
w3=OnchainTransportProvider.create_ochain_transport_w3(),
w3=OnchainTransportProvider.create_onchain_transport_w3(),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)),
parsers_providers=[PauseV2Parser, PauseV3Parser, PingParser],
Expand Down
2 changes: 1 addition & 1 deletion src/bots/unvetter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def prepare_transport_bus(self):
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
OnchainTransportProvider(
w3=OnchainTransportProvider.create_ochain_transport_w3(),
w3=OnchainTransportProvider.create_onchain_transport_w3(),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)),
parsers_providers=[UnvetParser, PingParser],
Expand Down
3 changes: 2 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from bots.unvetter import run_unvetter
from metrics.healthcheck_pulse import start_pulse_server
from metrics.logging import logging
from metrics.metrics import ETH_RPC_REQUESTS
from prometheus_client import start_http_server
from web3_multi_provider import FallbackProvider

Expand Down Expand Up @@ -49,7 +50,7 @@ def main(bot_name: str):
)

logger.info({'msg': 'Add metrics to web3 requests.'})
add_middlewares(w3)
add_middlewares(w3, ETH_RPC_REQUESTS)

if bot_name == BotModule.DEPOSITOR:
run_depositor(w3)
Expand Down
7 changes: 7 additions & 0 deletions src/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@
namespace=PROMETHEUS_PREFIX,
)

ONCHAIN_TRANSPORT_ETH_RPC_REQUESTS = Counter(
'onchain_transport_rpc_requests',
'Total count of requests to onchain transport RPC',
['method', 'code', 'domain'],
namespace=PROMETHEUS_PREFIX,
)

UNEXPECTED_EXCEPTIONS = Counter(
'unexpected_exceptions',
'Total count of unexpected exceptions',
Expand Down
8 changes: 5 additions & 3 deletions src/transport/msg_providers/onchain_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from typing import Callable, List, Optional

import variables
from blockchain.web3_extentions.middleware import add_cache_middleware
from blockchain.web3_extentions.middleware import add_middlewares
from eth_typing import ChecksumAddress
from eth_utils import to_bytes
from metrics.metrics import ONCHAIN_TRANSPORT_ETH_RPC_REQUESTS
from schema import Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_providers.rabbit import MessageType
Expand Down Expand Up @@ -325,5 +326,6 @@ def _parse_log(self, log: LogReceipt) -> Optional[dict]:
return None

@staticmethod
def create_ochain_transport_w3() -> Web3:
return add_cache_middleware(Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)))
def create_onchain_transport_w3() -> Web3:
w3 = Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS))
return add_middlewares(w3, ONCHAIN_TRANSPORT_ETH_RPC_REQUESTS)
5 changes: 3 additions & 2 deletions src/transport/msg_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ def get_messages_and_actualize(self, actualize_filter: Callable[[BotMessage], bo
"""
messages = self.messages
for transport in self._transports:
messages.extend(transport.get_messages())
self.messages = list(filter(lambda x: self._filter(x) and actualize_filter(x), messages))
filtered_messages = list(filter(lambda x: self._filter(x), transport.get_messages()))
messages.extend(filtered_messages)
self.messages = list(filter(lambda x: actualize_filter(x), messages))
return self.messages

def clear(self):
Expand Down
3 changes: 2 additions & 1 deletion tests/fixtures/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from blockchain.web3_extentions.lido_contracts import LidoContracts
from blockchain.web3_extentions.middleware import add_middlewares
from blockchain.web3_extentions.transaction import TransactionUtils
from metrics.metrics import ETH_RPC_REQUESTS
from web3 import HTTPProvider, Web3

from tests.fork import anvil_fork
Expand Down Expand Up @@ -35,7 +36,7 @@ def web3_provider_integration(request) -> Web3:

with anvil_fork(anvil_path, rpc_endpoint, block_num):
w3 = Web3(HTTPProvider('http://127.0.0.1:8545', request_kwargs={'timeout': 3600}))
add_middlewares(w3)
add_middlewares(w3, ETH_RPC_REQUESTS)
assert w3.is_connected(), 'Failed to connect to the Web3 provider.'
yield w3

Expand Down

0 comments on commit 28d4093

Please sign in to comment.