Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions packages/btc_rpc/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[build-system]
requires = ["setuptools>=68", "setuptools_scm[toml]>=8"]
build-backend = "setuptools.build_meta"

[project]
name = "btc-rpc-py"
requires-python = ">=3.10"
dynamic = ["version"]
dependencies = [
"eth_rpc",
]

# Enables the usage of setuptools_scm
[tool.setuptools_scm]
root = "../../"

[project.optional-dependencies]
lint = [
"mypy",
"ruff",
]
test = [
"pytest==7.4.1",
"pytest-cov==4.1.0",
"coverage[toml]==7.3.1",
]
build = [
"build[virtualenv]==1.0.3",
]
dev = [
"tox",
"btc_rpc_py[lint]",
"btc_rpc_py[test]",
"btc_rpc_py[build]",
]
8 changes: 8 additions & 0 deletions packages/btc_rpc/src/btc_rpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .block import Block
from .transaction import Transaction


__all__ = [
"Block",
"Transaction",
]
36 changes: 36 additions & 0 deletions packages/btc_rpc/src/btc_rpc/_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import TYPE_CHECKING, ClassVar, Optional

if TYPE_CHECKING:
from .types import Network
from .rpc.core import RPC

NetworkType = type[Network]


class Request:
_network: ClassVar[Optional["NetworkType"]] = None

@classmethod
def _rpc(cls) -> "RPC":
"""
This uses the default network, unless a network has been provided, then immediately unsets the network.
This makes it safe for async code.
"""
from ._transport import _force_get_global_rpc

network = cls._network
cls._network = None
response = _force_get_global_rpc(network)
return response

def _rpc_(self) -> "RPC":
"""
This uses the default network, unless a network has been provided, then immediately unsets the network.
This makes it safe for async code.
"""
from ._transport import _force_get_global_rpc

network = self._network
self._network = None # type: ignore[misc]
response = _force_get_global_rpc(network)
return response
11 changes: 11 additions & 0 deletions packages/btc_rpc/src/btc_rpc/_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import Generic, TypeVar

from pydantic import BaseModel

T = TypeVar("T")


class RPCResponse(BaseModel, Generic[T]):
id: int
result: T
error: str | None
70 changes: 70 additions & 0 deletions packages/btc_rpc/src/btc_rpc/_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from contextvars import ContextVar
from typing import TYPE_CHECKING

from eth_typing import HexStr
from pydantic import BaseModel, ConfigDict, Field

from .networks import Bitcoin
from .rpc.base import BaseRPC
from .types import Network

if TYPE_CHECKING:
from .rpc.core import RPC


class Transports(BaseModel):
default: type[Network] = Field(default=Bitcoin)
networks: dict[HexStr, type[Network]] = {}
rpcs: dict[HexStr, BaseRPC] = {}
retries: int = 0
id: int = 0

model_config = ConfigDict(arbitrary_types_allowed=True)


_selected_transports: ContextVar["Transports"] = ContextVar(
"_selected_transports",
default=Transports(),
)


def _force_get_default_network() -> type[Network]:
transport = _selected_transports.get()
return transport.default


def _force_get_global_rpc(network: type[Network] | None = None) -> "RPC":
from .rpc.core import RPC

transport = _selected_transports.get()

if not network:
network = transport.default

if network.id in transport.rpcs:
return transport.rpcs[network.id] # type: ignore
transport.rpcs[network.id] = RPC(network=network)
return transport.rpcs[network.id] # type: ignore


def set_transport(networks: list[type[Network]], retries: int = 0):
transport = _selected_transports.get()
transport.default = networks[0]
transport.networks = {network.id: network for network in networks}
transport.retries = retries


def set_default_network(network: type[Network]):
transport = _selected_transports.get()
transport.default = network
transport.networks[network.id] = network


def get_current_network() -> type[Network]:
transport = _selected_transports.get()
return transport.default


def set_rpc_timeout(timeout: float, network: type[Network] | None = None):
rpc = _force_get_global_rpc(network)
rpc.set_timeout(timeout)
90 changes: 90 additions & 0 deletions packages/btc_rpc/src/btc_rpc/block.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import Literal

from eth_typing import HexStr

from btc_rpc.types import (
GetBlockHashRequest,
GetBlockRequest,
GetTxRequest,
Transaction,
Block as BlockModel,
)
from ._request import Request


class Block(BlockModel, Request):
@classmethod
async def latest(cls, verbosity: Literal[0, 1, 2] = 2) -> "Block":
rpc = cls._rpc()
best_hash = await rpc.get_best_hash()
return await cls.from_hash(hash=best_hash, verbosity=verbosity)

@classmethod
async def from_hash(cls, hash: HexStr, verbosity: Literal[0, 1, 2] = 2):
rpc = cls._rpc()
block = await rpc.get_block(
GetBlockRequest(
blockhash=hash,
verbose=verbosity,
),
)
return block

@classmethod
async def get_block(cls, number: int, verbosity: Literal[0, 1, 2] = 1) -> "Block":
rpc = cls._rpc()
block_hash = await rpc.get_block_hash(
GetBlockHashRequest(
height=number,
)
)
return await cls.from_hash(hash=block_hash, verbosity=verbosity)

async def get_transactions(self, verbosity: Literal[0, 1, 2] = 1, blockhash: HexStr | None = None):
txs = []
rpc = self._rpc()
for tx in self.txs:
if isinstance(tx, Transaction):
tx_id = tx.txid
else:
tx_id = tx
txs.append(
await rpc.get_raw_transaction(
GetTxRequest(
txid=tx_id,
verbose=verbosity,
blockhash=blockhash,
)
)
)
return txs

@classmethod
async def load(self, number: int, verbosity: Literal[0, 1, 2] = 1) -> tuple["Block", list[Transaction]]:
block = await self.get_block(number=number, verbosity=verbosity)
txs = await block.get_transactions(verbosity=verbosity)
return block, txs

async def _transactions(self, verbosity: Literal[0, 1, 2] = 2):
rpc = self._rpc()
for tx in self.txs:
if isinstance(tx, Transaction):
tx_id = tx.txid
else:
tx_id = tx
yield await rpc.get_raw_transaction(
GetTxRequest(
txid=tx_id,
verbose=verbosity,
blockhash=self.hash,
)
)

@property
def transactions(self):
return self._transactions()

def __repr__(self):
return f"<Block number={self.height}>"

__str__ = __repr__
8 changes: 8 additions & 0 deletions packages/btc_rpc/src/btc_rpc/networks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .bitcoin import Bitcoin
from .bsv import BitcoinSV


__all__ = [
"Bitcoin",
"BitcoinSV",
]
10 changes: 10 additions & 0 deletions packages/btc_rpc/src/btc_rpc/networks/bitcoin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from eth_typing import HexStr

from btc_rpc.types import Network


class Bitcoin(Network):
id: HexStr = HexStr("0xD9B4BEF9")
name: str = "Bitcoin"
symbol: str = "BTC"
http: str
10 changes: 10 additions & 0 deletions packages/btc_rpc/src/btc_rpc/networks/bsv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from eth_typing import HexStr

from btc_rpc.types import Network


class BitcoinSV(Network):
id: HexStr = HexStr("0xD9B4BEF9")
name: str = "Bitcoin Satoshi Vision"
symbol: str = "BSV"
http: str
8 changes: 8 additions & 0 deletions packages/btc_rpc/src/btc_rpc/rpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .base import BaseRPC
from .core import RPC


__all__ = [
"BaseRPC",
"RPC",
]
39 changes: 39 additions & 0 deletions packages/btc_rpc/src/btc_rpc/rpc/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import itertools

import httpx
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr

from btc_rpc.types import Network


class BaseRPC(BaseModel):
_timeout: float = PrivateAttr(10.0)
_retries: int = PrivateAttr(3)

network: Network

index: itertools.count = Field(default_factory=lambda: itertools.count())
client: httpx.AsyncClient = Field(default_factory=lambda: httpx.AsyncClient())

@property
def timeout(self) -> httpx.Timeout:
"""Request Timeout"""
return httpx.Timeout(self._timeout)

def set_timeout(self, timeout: float):
self._timeout = timeout

def set_retries(self, retries: int):
self._retries = retries

@property
def retries(self):
return self._retries

@property
def http(self):
return self.network.http

model_config = ConfigDict(
arbitrary_types_allowed=True,
)
47 changes: 47 additions & 0 deletions packages/btc_rpc/src/btc_rpc/rpc/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import inspect
from typing import ClassVar

from pydantic import Field
from eth_typing import HexStr

from btc_rpc.types import (
GetBlockHashRequest,
GetBlockRequest,
GetTxRequest,
NoArgs,
RawTransactionRequest,
ChainInfo,
)
from .base import BaseRPC
from .method import RPCMethod
from btc_rpc.block import Block
from btc_rpc.transaction import Transaction


class RPC(BaseRPC):
decode_raw_transaction: ClassVar = RPCMethod[RawTransactionRequest, Transaction](name="decoderawtransaction")
get_best_hash: ClassVar = RPCMethod[NoArgs, HexStr](name="getbestblockhash")
get_chain_info: ClassVar = RPCMethod[NoArgs, ChainInfo](name="getblockchaininfo")
get_block_hash: ClassVar = RPCMethod[GetBlockHashRequest, HexStr](name="getblockhash")
get_block: ClassVar = RPCMethod[GetBlockRequest, Block | HexStr](name="getblock")
get_raw_transaction: ClassVar = RPCMethod[GetTxRequest, Transaction | HexStr](name="getrawtransaction")

methods: dict[str, RPCMethod] = Field(default_factory=dict)

def __getattribute__(self, name):
"""
This makes it so you can assign copies of the RPCMethod ClassVars to the instance.
Otherwise, all RPC instances would share the same method classes, and this would
make it difficult to have method calls on different networks.
"""
dict_ = super().__getattribute__("__dict__")
methods = dict_.get("methods", {})
if name in methods:
return methods[name]
return super().__getattribute__(name)

def model_post_init(self, __context):
rpc_methods = inspect.getmembers(self, predicate=lambda x: isinstance(x, RPCMethod))
for name, method in rpc_methods:
new_method = method.copy()
self.methods[name] = new_method.set_rpc(self)
Loading