Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions hwilib/devices/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ def legacy_sign_tx() -> PSBT:
wallet = WalletPolicy("", "wpkh(@0/**)", [""])
legacy_input_sigs = client.sign_psbt(tx, wallet, None)

for idx, pubkey, sig in legacy_input_sigs:
for idx, partial_sig in legacy_input_sigs:
psbt_in = tx.inputs[idx]
psbt_in.partial_sigs[pubkey] = sig
psbt_in.partial_sigs[partial_sig.pubkey] = partial_sig.signature
return tx

if isinstance(self.client, LegacyClient):
Expand Down Expand Up @@ -348,7 +348,7 @@ def process_origin(origin: KeyOriginInfo) -> None:

input_sigs = self.client.sign_psbt(psbt2, wallet, wallet_hmac)

for idx, pubkey, sig in input_sigs:
for idx, yielded in input_sigs:
psbt_in = psbt2.inputs[idx]

utxo = None
Expand All @@ -362,11 +362,13 @@ def process_origin(origin: KeyOriginInfo) -> None:
is_wit, wit_ver, _ = utxo.is_witness()

if is_wit and wit_ver >= 1:
# TODO: Deal with script path signatures
# For now, assume key path signature
psbt_in.tap_key_sig = sig
if yielded.tapleaf_hash is None:
psbt_in.tap_key_sig = yielded.signature
else:
psbt_in.tap_script_sigs[(yielded.pubkey, yielded.tapleaf_hash)] = yielded.signature

else:
psbt_in.partial_sigs[pubkey] = sig
psbt_in.partial_sigs[yielded.pubkey] = yielded.signature

# Extract the sigs from psbt2 and put them into tx
for sig_in, psbt_in in zip(psbt2.inputs, tx.inputs):
Expand Down
49 changes: 37 additions & 12 deletions hwilib/devices/ledger_bitcoin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from .command_builder import BitcoinCommandBuilder, BitcoinInsType
from ...common import Chain
from .client_command import ClientCommandInterpreter
from .client_base import Client, TransportClient
from .client_base import Client, PartialSignature, SignPsbtYieldedObject, TransportClient
from .client_legacy import LegacyClient
from .errors import UnknownDeviceError
from .exception import DeviceException, NotSupportedError
from .merkle import get_merkleized_map_commitment
from .wallet import WalletPolicy, WalletType
Expand All @@ -31,6 +32,37 @@ def parse_stream_to_map(f: BufferedReader) -> Mapping[bytes, bytes]:
result[key] = value
return result

def _make_partial_signature(pubkey_augm: bytes, signature: bytes) -> PartialSignature:
if len(pubkey_augm) == 64:
# tapscript spend: pubkey_augm is the concatenation of:
# - a 32-byte x-only pubkey
# - the 32-byte tapleaf_hash
return PartialSignature(signature=signature, pubkey=pubkey_augm[0:32], tapleaf_hash=pubkey_augm[32:])

else:
# either legacy, segwit or taproot keypath spend
# pubkey must be 32 (taproot x-only pubkey) or 33 bytes (compressed pubkey)

if len(pubkey_augm) not in [32, 33]:
raise UnknownDeviceError(f"Invalid pubkey length returned: {len(pubkey_augm)}")

return PartialSignature(signature=signature, pubkey=pubkey_augm)

def _decode_signpsbt_yielded_value(res: bytes) -> Tuple[int, SignPsbtYieldedObject]:
res_buffer = BytesIO(res)
input_index_or_tag = read_varint(res_buffer)

# values follow an encoding without an explicit tag, where the
# first element is the input index. All the signature types are implemented
# by the PartialSignature type (not to be confused with the musig Partial Signature).
input_index = input_index_or_tag

pubkey_augm_len = read_uint(res_buffer, 8)
pubkey_augm = res_buffer.read(pubkey_augm_len)

signature = res_buffer.read()

return((input_index, _make_partial_signature(pubkey_augm, signature)))

def read_uint(buf: BytesIO,
bit_len: int,
Expand Down Expand Up @@ -156,7 +188,7 @@ def get_wallet_address(

return response.decode()

def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[bytes]) -> List[Tuple[int, bytes, bytes]]:
def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[bytes]) -> List[Tuple[int, SignPsbtYieldedObject]]:
"""Signs a PSBT using a registered wallet (or a standard wallet that does not need registration).

Signature requires explicit approval from the user.
Expand Down Expand Up @@ -240,17 +272,10 @@ def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[byte
if any(len(x) <= 1 for x in results):
raise RuntimeError("Invalid response")

results_list: List[Tuple[int, bytes, bytes]] = []
results_list: List[Tuple[int, SignPsbtYieldedObject]] = []
for res in results:
res_buffer = BytesIO(res)
input_index = read_varint(res_buffer)

pubkey_len = read_uint(res_buffer, 8)
pubkey = res_buffer.read(pubkey_len)

signature = res_buffer.read()

results_list.append((input_index, pubkey, signature))
input_index, obj = _decode_signpsbt_yielded_value(res)
results_list.append((input_index, obj))

return results_list

Expand Down
31 changes: 26 additions & 5 deletions hwilib/devices/ledger_bitcoin/client_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from dataclasses import dataclass

from typing import Tuple, Optional, Union, List
from io import BytesIO

Expand Down Expand Up @@ -45,6 +47,25 @@ def apdu_exchange_nowait(
def stop(self) -> None:
self.transport.close()

@dataclass(frozen=True)
class PartialSignature:
"""Represents a partial signature returned by sign_psbt. Such objects can be added to the PSBT.

It always contains a pubkey and a signature.
The pubkey is a compressed 33-byte for legacy and segwit Scripts, or 32-byte x-only key for taproot.
The signature is in the format it would be pushed on the scriptSig or the witness stack, therefore of
variable length, and possibly concatenated with the SIGHASH flag byte if appropriate.

The tapleaf_hash is also filled if signing for a tapscript.

Note: not to be confused with 'partial signature' of protocols like MuSig2;
"""
pubkey: bytes
signature: bytes
tapleaf_hash: Optional[bytes] = None


SignPsbtYieldedObject = Union[PartialSignature]

class Client:
def __init__(self, transport_client: TransportClient, chain: Chain = Chain.MAIN) -> None:
Expand Down Expand Up @@ -183,18 +204,19 @@ def get_wallet_address(

raise NotImplementedError

def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[bytes]) -> List[Tuple[int, bytes, bytes]]:
def sign_psbt(self, psbt: Union[PSBT, bytes, str], wallet: WalletPolicy, wallet_hmac: Optional[bytes]) -> List[Tuple[int, SignPsbtYieldedObject]]:
"""Signs a PSBT using a registered wallet (or a standard wallet that does not need registration).

Signature requires explicit approval from the user.

Parameters
----------
psbt : PSBT
psbt : PSBT | bytes | str
A PSBT of version 0 or 2, with all the necessary information to sign the inputs already filled in; what the
required fields changes depending on the type of input.
The non-witness UTXO must be present for both legacy and SegWit inputs, or the hardware wallet will reject
signing (this will change for Taproot inputs).
The argument can be either a `PSBT` object, or `bytes`, or a base64-encoded `str`.

wallet : WalletPolicy
The registered wallet policy, or a standard wallet policy.
Expand All @@ -204,11 +226,10 @@ def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[byte

Returns
-------
List[Tuple[int, bytes, bytes]]
List[Tuple[int, PartialSignature]]
A list of tuples returned by the hardware wallets, where each element is a tuple of:
- an integer, the index of the input being signed;
- a `bytes` array of length 33 (compressed ecdsa pubkey) or 32 (x-only BIP-0340 pubkey), the corresponding pubkey for this signature;
- a `bytes` array with the signature.
- an instance of `PartialSignature`.
"""

raise NotImplementedError
Expand Down
10 changes: 5 additions & 5 deletions hwilib/devices/ledger_bitcoin/client_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import re
import base64

from .client import Client, TransportClient
from .client import Client, PartialSignature, SignPsbtYieldedObject, TransportClient

from typing import List, Tuple, Optional, Union

Expand Down Expand Up @@ -137,7 +137,7 @@ def get_wallet_address(
return output['address'][12:-2] # HACK: A bug in getWalletPublicKey results in the address being returned as the string "bytearray(b'<address>')". This extracts the actual address to work around this.

# NOTE: This is different from the new API, but we need it for multisig support.
def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[bytes]) -> List[Tuple[int, bytes, bytes]]:
def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[bytes]) -> List[Tuple[int, SignPsbtYieldedObject]]:
if wallet_hmac is not None or wallet.n_keys != 1:
raise NotImplementedError("Policy wallets are only supported from version 2.0.0. Please update your Ledger hardware wallet")

Expand Down Expand Up @@ -259,7 +259,7 @@ def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[byte

all_signature_attempts[i_num] = signature_attempts

result: List[int, bytes, bytes] = []
result: List[int, SignPsbtYieldedObject] = []

# Sign any segwit inputs
if has_segwit:
Expand All @@ -276,7 +276,7 @@ def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[byte
for signature_attempt in all_signature_attempts[i]:
self.app.startUntrustedTransaction(False, 0, [segwit_inputs[i]], script_codes[i], c_tx.nVersion)

result.append((i, signature_attempt[1], self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01)))
result.append((i, PartialSignature(pubkey=signature_attempt[1], signature=self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01))))

elif has_legacy:
first_input = True
Expand All @@ -287,7 +287,7 @@ def sign_psbt(self, psbt: PSBT, wallet: WalletPolicy, wallet_hmac: Optional[byte
self.app.startUntrustedTransaction(first_input, i, legacy_inputs, script_codes[i], c_tx.nVersion)
self.app.finalizeInput(b"DUMMY", -1, -1, change_path, tx_bytes)

result.append((i, signature_attempt[1], self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01)))
result.append((i, PartialSignature(pubkey=signature_attempt[1], signature=self.app.untrustedHashSign(signature_attempt[0], "", c_tx.nLockTime, 0x01))))

first_input = False

Expand Down
Loading