diff --git a/bitcoin_client/ledger_bitcoin/bip380/README b/bitcoin_client/ledger_bitcoin/bip380/README deleted file mode 100644 index 3609525a4..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/README +++ /dev/null @@ -1,4 +0,0 @@ -This folder is based on https://github.com/Eunovo/python-bip380/tree/4226b7f2b70211d696155f6fd39edc611761ed0b, in turn built on https://github.com/darosior/python-bip380/commit/d2f5d8f5b41cba189bd793c1081e9d61d2d160c1. - -The library is "not ready for any real world use", however we _only_ use it in order to generate addresses for descriptors containing miniscript, and compare the result with the address computed by the device. -This is a generic mitigation for any bug related to address generation on the device, like [this](https://donjon.ledger.com/lsb/019/). diff --git a/bitcoin_client/ledger_bitcoin/bip380/__init__.py b/bitcoin_client/ledger_bitcoin/bip380/__init__.py deleted file mode 100644 index 27fdca497..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "0.0.3" diff --git a/bitcoin_client/ledger_bitcoin/bip380/descriptors/__init__.py b/bitcoin_client/ledger_bitcoin/bip380/descriptors/__init__.py deleted file mode 100644 index bc4eaac4d..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/descriptors/__init__.py +++ /dev/null @@ -1,220 +0,0 @@ -from ...bip380.key import DescriptorKey -from ...bip380.miniscript import Node -from ...bip380.utils.hashes import sha256, hash160 -from ...bip380.utils.script import ( - CScript, - OP_1, - OP_DUP, - OP_HASH160, - OP_EQUALVERIFY, - OP_CHECKSIG, -) - -from .checksum import descsum_create -from .errors import DescriptorParsingError -from .parsing import descriptor_from_str -from .utils import taproot_tweak - - -class Descriptor: - """A Bitcoin Output Script Descriptor.""" - - def from_str(desc_str, strict=False): - """Parse a Bitcoin Output Script Descriptor from its string representation. - - :param strict: whether to require the presence of a checksum. - """ - desc = descriptor_from_str(desc_str, strict) - - # BIP389 prescribes that no two multipath key expressions in a single descriptor - # have different length. - multipath_len = None - for key in desc.keys: - if key.is_multipath(): - m_len = len(key.path.paths) - if multipath_len is None: - multipath_len = m_len - elif multipath_len != m_len: - raise DescriptorParsingError( - f"Descriptor contains multipath key expressions with varying length: '{desc_str}'." - ) - - return desc - - @property - def script_pubkey(self): - """Get the ScriptPubKey (output 'locking' Script) for this descriptor.""" - # To be implemented by derived classes - raise NotImplementedError - - @property - def script_sighash(self): - """Get the Script to be committed to by the signature hash of a spending transaction.""" - # To be implemented by derived classes - raise NotImplementedError - - @property - def keys(self): - """Get the list of all keys from this descriptor, in order of apparition.""" - # To be implemented by derived classes - raise NotImplementedError - - def derive(self, index): - """Derive the key at the given derivation index. - - A no-op if the key isn't a wildcard. Will start from 2**31 if the key is a "hardened - wildcard". - """ - assert isinstance(index, int) - for key in self.keys: - key.derive(index) - - def satisfy(self, *args, **kwargs): - """Get the witness stack to spend from this descriptor. - - Various data may need to be passed as parameters to meet the locking - conditions set by the Script. - """ - # To be implemented by derived classes - raise NotImplementedError - - def copy(self): - """Get a copy of this descriptor.""" - # FIXME: do something nicer than roundtripping through string ser - return Descriptor.from_str(str(self)) - - def is_multipath(self): - """Whether this descriptor contains multipath key expression(s).""" - return any(k.is_multipath() for k in self.keys) - - def singlepath_descriptors(self): - """Get a list of descriptors that only contain keys that don't have multiple - derivation paths. - """ - singlepath_descs = [self.copy()] - - # First figure out the number of descriptors there will be - for key in self.keys: - if key.is_multipath(): - singlepath_descs += [ - self.copy() for _ in range(len(key.path.paths) - 1) - ] - break - - # Return early if there was no multipath key expression - if len(singlepath_descs) == 1: - return singlepath_descs - - # Then use one path for each - for i, desc in enumerate(singlepath_descs): - for key in desc.keys: - if key.is_multipath(): - assert len(key.path.paths) == len(singlepath_descs) - key.path.paths = key.path.paths[i: i + 1] - - assert all(not d.is_multipath() for d in singlepath_descs) - return singlepath_descs - - -# TODO: add methods to give access to all the Miniscript analysis -class WshDescriptor(Descriptor): - """A Segwit v0 P2WSH Output Script Descriptor.""" - - def __init__(self, witness_script): - assert isinstance(witness_script, Node) - self.witness_script = witness_script - - def __repr__(self): - return descsum_create(f"wsh({self.witness_script})") - - @property - def script_pubkey(self): - witness_program = sha256(self.witness_script.script) - return CScript([0, witness_program]) - - @property - def script_sighash(self): - return self.witness_script.script - - @property - def keys(self): - return self.witness_script.keys - - def satisfy(self, sat_material=None): - """Get the witness stack to spend from this descriptor. - - :param sat_material: a miniscript.satisfaction.SatisfactionMaterial with data - available to fulfill the conditions set by the Script. - """ - sat = self.witness_script.satisfy(sat_material) - if sat is not None: - return sat + [self.witness_script.script] - - -class WpkhDescriptor(Descriptor): - """A Segwit v0 P2WPKH Output Script Descriptor.""" - - def __init__(self, pubkey): - assert isinstance(pubkey, DescriptorKey) - self.pubkey = pubkey - - def __repr__(self): - return descsum_create(f"wpkh({self.pubkey})") - - @property - def script_pubkey(self): - witness_program = hash160(self.pubkey.bytes()) - return CScript([0, witness_program]) - - @property - def script_sighash(self): - key_hash = hash160(self.pubkey.bytes()) - return CScript([OP_DUP, OP_HASH160, key_hash, OP_EQUALVERIFY, OP_CHECKSIG]) - - @property - def keys(self): - return [self.pubkey] - - def satisfy(self, signature): - """Get the witness stack to spend from this descriptor. - - :param signature: a signature (in bytes) for the pubkey from the descriptor. - """ - assert isinstance(signature, bytes) - return [signature, self.pubkey.bytes()] - - -class TrDescriptor(Descriptor): - """A Pay-to-Taproot Output Script Descriptor.""" - - def __init__(self, internal_key): - assert isinstance(internal_key, DescriptorKey) and internal_key.x_only - self.internal_key = internal_key - - def __repr__(self): - return descsum_create(f"tr({self.internal_key})") - - def output_key(self): - # "If the spending conditions do not require a script path, the output key - # should commit to an unspendable script path" (see BIP341, BIP386) - return taproot_tweak(self.internal_key.bytes(), b"").format() - - @property - def script_pubkey(self): - return CScript([OP_1, self.output_key()]) - - @property - def keys(self): - return [self.internal_key] - - def satisfy(self, sat_material=None): - """Get the witness stack to spend from this descriptor. - - :param sat_material: a miniscript.satisfaction.SatisfactionMaterial with data - available to spend from the key path or any of the leaves. - """ - out_key = self.output_key() - if out_key in sat_material.signatures: - return [sat_material.signatures[out_key]] - - return diff --git a/bitcoin_client/ledger_bitcoin/bip380/descriptors/checksum.py b/bitcoin_client/ledger_bitcoin/bip380/descriptors/checksum.py deleted file mode 100644 index 9f3e01326..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/descriptors/checksum.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) 2019 Pieter Wuille -# Distributed under the MIT software license, see the accompanying -# file COPYING or http://www.opensource.org/licenses/mit-license.php. -"""Utility functions related to output descriptors""" - -import re - -INPUT_CHARSET = "0123456789()[],'/*abcdefgh@:$%{}IJKLMNOPQRSTUVWXYZ&+-.;<=>?!^_|~ijklmnopqrstuvwxyzABCDEFGH`#\"\\ " -CHECKSUM_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" -GENERATOR = [0xF5DEE51989, 0xA9FDCA3312, 0x1BAB10E32D, 0x3706B1677A, 0x644D626FFD] - - -def descsum_polymod(symbols): - """Internal function that computes the descriptor checksum.""" - chk = 1 - for value in symbols: - top = chk >> 35 - chk = (chk & 0x7FFFFFFFF) << 5 ^ value - for i in range(5): - chk ^= GENERATOR[i] if ((top >> i) & 1) else 0 - return chk - - -def descsum_expand(s): - """Internal function that does the character to symbol expansion""" - groups = [] - symbols = [] - for c in s: - if not c in INPUT_CHARSET: - return None - v = INPUT_CHARSET.find(c) - symbols.append(v & 31) - groups.append(v >> 5) - if len(groups) == 3: - symbols.append(groups[0] * 9 + groups[1] * 3 + groups[2]) - groups = [] - if len(groups) == 1: - symbols.append(groups[0]) - elif len(groups) == 2: - symbols.append(groups[0] * 3 + groups[1]) - return symbols - - -def descsum_create(s): - """Add a checksum to a descriptor without""" - symbols = descsum_expand(s) + [0, 0, 0, 0, 0, 0, 0, 0] - checksum = descsum_polymod(symbols) ^ 1 - return ( - s - + "#" - + "".join(CHECKSUM_CHARSET[(checksum >> (5 * (7 - i))) & 31] for i in range(8)) - ) - - -def descsum_check(s): - """Verify that the checksum is correct in a descriptor""" - if s[-9] != "#": - return False - if not all(x in CHECKSUM_CHARSET for x in s[-8:]): - return False - symbols = descsum_expand(s[:-9]) + [CHECKSUM_CHARSET.find(x) for x in s[-8:]] - return descsum_polymod(symbols) == 1 - - -def drop_origins(s): - """Drop the key origins from a descriptor""" - desc = re.sub(r"\[.+?\]", "", s) - if "#" in s: - desc = desc[: desc.index("#")] - return descsum_create(desc) diff --git a/bitcoin_client/ledger_bitcoin/bip380/descriptors/errors.py b/bitcoin_client/ledger_bitcoin/bip380/descriptors/errors.py deleted file mode 100644 index f7b58483a..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/descriptors/errors.py +++ /dev/null @@ -1,5 +0,0 @@ -class DescriptorParsingError(ValueError): - """Error while parsing a Bitcoin Output Descriptor from its string representation""" - - def __init__(self, message): - self.message = message diff --git a/bitcoin_client/ledger_bitcoin/bip380/descriptors/parsing.py b/bitcoin_client/ledger_bitcoin/bip380/descriptors/parsing.py deleted file mode 100644 index 1d18bffdd..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/descriptors/parsing.py +++ /dev/null @@ -1,56 +0,0 @@ -from ...bip380 import descriptors -from ...bip380.key import DescriptorKey, DescriptorKeyError -from ...bip380.miniscript import Node -from ...bip380.descriptors.checksum import descsum_check - -from .errors import DescriptorParsingError - - -def split_checksum(desc_str, strict=False): - """Removes and check the provided checksum. - If not told otherwise, this won't fail on a missing checksum. - - :param strict: whether to require the presence of the checksum. - """ - desc_split = desc_str.split("#") - if len(desc_split) != 2: - if strict: - raise DescriptorParsingError("Missing checksum") - return desc_split[0] - - descriptor, checksum = desc_split - if not descsum_check(desc_str): - raise DescriptorParsingError( - f"Checksum '{checksum}' is invalid for '{descriptor}'" - ) - - return descriptor - - -def descriptor_from_str(desc_str, strict=False): - """Parse a Bitcoin Output Script Descriptor from its string representation. - - :param strict: whether to require the presence of a checksum. - """ - desc_str = split_checksum(desc_str, strict=strict) - - if desc_str.startswith("wsh(") and desc_str.endswith(")"): - # TODO: decent errors in the Miniscript module to be able to catch them here. - ms = Node.from_str(desc_str[4:-1]) - return descriptors.WshDescriptor(ms) - - if desc_str.startswith("wpkh(") and desc_str.endswith(")"): - try: - pubkey = DescriptorKey(desc_str[5:-1]) - except DescriptorKeyError as e: - raise DescriptorParsingError(str(e)) - return descriptors.WpkhDescriptor(pubkey) - - if desc_str.startswith("tr(") and desc_str.endswith(")"): - try: - pubkey = DescriptorKey(desc_str[3:-1], x_only=True) - except DescriptorKeyError as e: - raise DescriptorParsingError(str(e)) - return descriptors.TrDescriptor(pubkey) - - raise DescriptorParsingError(f"Unknown descriptor fragment: {desc_str}") diff --git a/bitcoin_client/ledger_bitcoin/bip380/descriptors/utils.py b/bitcoin_client/ledger_bitcoin/bip380/descriptors/utils.py deleted file mode 100644 index 25dbfe94f..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/descriptors/utils.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Utilities for working with descriptors.""" -import coincurve -import hashlib - - -def tagged_hash(tag, data): - ss = hashlib.sha256(tag.encode("utf-8")).digest() - ss += ss - ss += data - return hashlib.sha256(ss).digest() - - -def taproot_tweak(pubkey_bytes, merkle_root): - assert isinstance(pubkey_bytes, bytes) and len(pubkey_bytes) == 32 - assert isinstance(merkle_root, bytes) - - t = tagged_hash("TapTweak", pubkey_bytes + merkle_root) - xonly_pubkey = coincurve.PublicKeyXOnly(pubkey_bytes) - xonly_pubkey.tweak_add(t) # TODO: error handling - - return xonly_pubkey diff --git a/bitcoin_client/ledger_bitcoin/bip380/key.py b/bitcoin_client/ledger_bitcoin/bip380/key.py deleted file mode 100644 index 3e05b61d5..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/key.py +++ /dev/null @@ -1,338 +0,0 @@ -import coincurve -import copy - -from bip32 import BIP32, HARDENED_INDEX -from bip32.utils import _deriv_path_str_to_list -from .utils.hashes import hash160 -from enum import Enum, auto - - -def is_raw_key(obj): - return isinstance(obj, (coincurve.PublicKey, coincurve.PublicKeyXOnly)) - - -class DescriptorKeyError(Exception): - def __init__(self, message): - self.message = message - - -class DescriporKeyOrigin: - """The origin of a key in a descriptor. - - See https://github.com/bitcoin/bips/blob/master/bip-0380.mediawiki#key-expressions. - """ - - def __init__(self, fingerprint, path): - assert isinstance(fingerprint, bytes) and isinstance(path, list) - - self.fingerprint = fingerprint - self.path = path - - def from_str(origin_str): - # Origing starts and ends with brackets - if not origin_str.startswith("[") or not origin_str.endswith("]"): - raise DescriptorKeyError(f"Insane origin: '{origin_str}'") - # At least 8 hex characters + brackets - if len(origin_str) < 10: - raise DescriptorKeyError(f"Insane origin: '{origin_str}'") - - # For the fingerprint, just read the 4 bytes. - try: - fingerprint = bytes.fromhex(origin_str[1:9]) - except ValueError: - raise DescriptorKeyError(f"Insane fingerprint in origin: '{origin_str}'") - # For the path, we (how bad) reuse an internal helper from python-bip32. - path = [] - if len(origin_str) > 10: - if origin_str[9] != "/": - raise DescriptorKeyError(f"Insane path in origin: '{origin_str}'") - # The helper operates on "m/10h/11/12'/13", so give it a "m". - dummy = "m" - try: - path = _deriv_path_str_to_list(dummy + origin_str[9:-1]) - except ValueError: - raise DescriptorKeyError(f"Insane path in origin: '{origin_str}'") - - return DescriporKeyOrigin(fingerprint, path) - - -class KeyPathKind(Enum): - FINAL = auto() - WILDCARD_UNHARDENED = auto() - WILDCARD_HARDENED = auto() - - def is_wildcard(self): - return self in [KeyPathKind.WILDCARD_HARDENED, KeyPathKind.WILDCARD_UNHARDENED] - - -def parse_index(index_str): - """Parse a derivation index, as contained in a derivation path.""" - assert isinstance(index_str, str) - - try: - # if HARDENED - if index_str[-1:] in ["'", "h", "H"]: - return int(index_str[:-1]) + HARDENED_INDEX - else: - return int(index_str) - except ValueError as e: - raise DescriptorKeyError(f"Invalid derivation index {index_str}: '{e}'") - - -class DescriptorKeyPath: - """The derivation path of a key in a descriptor. - - See https://github.com/bitcoin/bips/blob/master/bip-0380.mediawiki#key-expressions - as well as BIP389 for multipath expressions. - """ - - def __init__(self, paths, kind): - assert ( - isinstance(paths, list) - and isinstance(kind, KeyPathKind) - and len(paths) > 0 - and all(isinstance(p, list) for p in paths) - ) - - self.paths = paths - self.kind = kind - - def is_multipath(self): - """Whether this derivation path actually contains multiple of them.""" - return len(self.paths) > 1 - - def from_str(path_str): - if len(path_str) < 2: - raise DescriptorKeyError(f"Insane key path: '{path_str}'") - if path_str[0] != "/": - raise DescriptorKeyError(f"Insane key path: '{path_str}'") - - # Determine whether this key may be derived. - kind = KeyPathKind.FINAL - if len(path_str) > 2 and path_str[-3:] in ["/*'", "/*h", "/*H"]: - kind = KeyPathKind.WILDCARD_HARDENED - path_str = path_str[:-3] - elif len(path_str) > 1 and path_str[-2:] == "/*": - kind = KeyPathKind.WILDCARD_UNHARDENED - path_str = path_str[:-2] - - paths = [[]] - if len(path_str) == 0: - return DescriptorKeyPath(paths, kind) - - for index in path_str[1:].split("/"): - # If this is a multipath expression, of the form '' - if ( - index.startswith("<") - and index.endswith(">") - and ";" in index - and len(index) >= 5 - ): - # Can't have more than one multipath expression - if len(paths) > 1: - raise DescriptorKeyError( - f"May only have a single multipath step in derivation path: '{path_str}'" - ) - indexes = index[1:-1].split(";") - paths = [copy.copy(paths[0]) for _ in indexes] - for i, der_index in enumerate(indexes): - paths[i].append(parse_index(der_index)) - else: - # This is a "single index" expression. - for path in paths: - path.append(parse_index(index)) - return DescriptorKeyPath(paths, kind) - - -class DescriptorKey: - """A Bitcoin key to be used in Output Script Descriptors. - - May be an extended or raw public key. - """ - - def __init__(self, key, x_only=False): - # Information about the origin of this key. - self.origin = None - # If it is an xpub, a path toward a child key of that xpub. - self.path = None - # Whether to only create x-only public keys. - self.x_only = x_only - # Whether to serialize to string representation without the sign byte. - # This is necessary to roundtrip 33-bytes keys under Taproot context. - self.ser_x_only = x_only - - if isinstance(key, bytes): - if len(key) == 32: - key_cls = coincurve.PublicKeyXOnly - self.x_only = True - self.ser_x_only = True - elif len(key) == 33: - key_cls = coincurve.PublicKey - self.ser_x_only = False - else: - raise DescriptorKeyError( - "Only compressed and x-only keys are supported" - ) - try: - self.key = key_cls(key) - except ValueError as e: - raise DescriptorKeyError(f"Public key parsing error: '{str(e)}'") - - elif isinstance(key, BIP32): - self.key = key - - elif isinstance(key, str): - # Try parsing an optional origin prepended to the key - splitted_key = key.split("]", maxsplit=1) - if len(splitted_key) == 2: - origin, key = splitted_key - self.origin = DescriporKeyOrigin.from_str(origin + "]") - - # Is it a raw key? - if len(key) in (64, 66): - pk_cls = coincurve.PublicKey - if len(key) == 64: - pk_cls = coincurve.PublicKeyXOnly - self.x_only = True - self.ser_x_only = True - else: - self.ser_x_only = False - try: - self.key = pk_cls(bytes.fromhex(key)) - except ValueError as e: - raise DescriptorKeyError(f"Public key parsing error: '{str(e)}'") - # If not it must be an xpub. - else: - # There may be an optional path appended to the xpub. - splitted_key = key.split("/", maxsplit=1) - if len(splitted_key) == 2: - key, path = splitted_key - self.path = DescriptorKeyPath.from_str("/" + path) - - try: - self.key = BIP32.from_xpub(key) - except ValueError as e: - raise DescriptorKeyError(f"Xpub parsing error: '{str(e)}'") - - else: - raise DescriptorKeyError( - "Invalid parameter type: expecting bytes, hex str or BIP32 instance." - ) - - def __repr__(self): - key = "" - - def ser_index(key, der_index): - # If this a hardened step, deduce the threshold and mark it. - if der_index < HARDENED_INDEX: - return str(der_index) - else: - return f"{der_index - 2**31}'" - - def ser_paths(key, paths): - assert len(paths) > 0 - - for i, der_index in enumerate(paths[0]): - # If this is a multipath expression, write the multi-index step accordingly - if len(paths) > 1 and paths[1][i] != der_index: - key += "/<" - for j, path in enumerate(paths): - key += ser_index(key, path[i]) - if j < len(paths) - 1: - key += ";" - key += ">" - else: - key += "/" + ser_index(key, der_index) - - return key - - if self.origin is not None: - key += f"[{self.origin.fingerprint.hex()}" - key = ser_paths(key, [self.origin.path]) - key += "]" - - if isinstance(self.key, BIP32): - key += self.key.get_xpub() - else: - assert is_raw_key(self.key) - raw_key = self.key.format() - if len(raw_key) == 33 and self.ser_x_only: - raw_key = raw_key[1:] - key += raw_key.hex() - - if self.path is not None: - key = ser_paths(key, self.path.paths) - if self.path.kind == KeyPathKind.WILDCARD_UNHARDENED: - key += "/*" - elif self.path.kind == KeyPathKind.WILDCARD_HARDENED: - key += "/*'" - - return key - - def is_multipath(self): - """Whether this key contains more than one derivation path.""" - return self.path is not None and self.path.is_multipath() - - def derivation_path(self): - """Get the single derivation path for this key. - - Will raise if it has multiple, and return None if it doesn't have any. - """ - if self.path is None: - return None - if self.path.is_multipath(): - raise DescriptorKeyError( - f"Key has multiple derivation paths: {self.path.paths}" - ) - return self.path.paths[0] - - def bytes(self): - """Get this key as raw bytes. - - Will raise if this key contains multiple derivation paths. - """ - if is_raw_key(self.key): - raw = self.key.format() - if self.x_only and len(raw) == 33: - return raw[1:] - assert len(raw) == 32 or not self.x_only - return raw - else: - assert isinstance(self.key, BIP32) - path = self.derivation_path() - if path is None: - return self.key.pubkey - assert not self.path.kind.is_wildcard() # TODO: real errors - return self.key.get_pubkey_from_path(path) - - def derive(self, index): - """Derive the key at the given index. - - Will raise if this key contains multiple derivation paths. - A no-op if the key isn't a wildcard. Will start from 2**31 if the key is a "hardened - wildcard". - """ - assert isinstance(index, int) - if ( - self.path is None - or self.path.is_multipath() - or self.path.kind == KeyPathKind.FINAL - ): - return - assert isinstance(self.key, BIP32) - - if self.path.kind == KeyPathKind.WILDCARD_HARDENED: - index += 2 ** 31 - assert index < 2 ** 32 - - if self.origin is None: - fingerprint = hash160(self.key.pubkey)[:4] - self.origin = DescriporKeyOrigin(fingerprint, [index]) - else: - self.origin.path.append(index) - - # This can't fail now. - path = self.derivation_path() - # TODO(bip32): have a way to derive without roundtripping through string ser. - self.key = BIP32.from_xpub(self.key.get_xpub_from_path(path + [index])) - self.path = None diff --git a/bitcoin_client/ledger_bitcoin/bip380/miniscript/__init__.py b/bitcoin_client/ledger_bitcoin/bip380/miniscript/__init__.py deleted file mode 100644 index b0de1f9c7..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/miniscript/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -""" -Miniscript -========== - -Miniscript is an extension to Bitcoin Output Script descriptors. It is a language for \ -writing (a subset of) Bitcoin Scripts in a structured way, enabling analysis, composition, \ -generic signing and more. - -For more information about Miniscript, see https://bitcoin.sipa.be/miniscript. -""" - -from .fragments import Node -from .satisfaction import SatisfactionMaterial diff --git a/bitcoin_client/ledger_bitcoin/bip380/miniscript/errors.py b/bitcoin_client/ledger_bitcoin/bip380/miniscript/errors.py deleted file mode 100644 index 7ccd98f4e..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/miniscript/errors.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -All the exceptions raised when dealing with Miniscript. -""" - - -class MiniscriptMalformed(ValueError): - def __init__(self, message): - self.message = message - - -class MiniscriptNodeCreationError(ValueError): - def __init__(self, message): - self.message = message - - -class MiniscriptPropertyError(ValueError): - def __init__(self, message): - self.message = message - -# TODO: errors for type errors, parsing errors, etc.. diff --git a/bitcoin_client/ledger_bitcoin/bip380/miniscript/fragments.py b/bitcoin_client/ledger_bitcoin/bip380/miniscript/fragments.py deleted file mode 100644 index d0e572eeb..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/miniscript/fragments.py +++ /dev/null @@ -1,1225 +0,0 @@ -""" -Miniscript AST elements. - -Each element correspond to a Bitcoin Script fragment, and has various type properties. -See the Miniscript website for the specification of the type system: https://bitcoin.sipa.be/miniscript/. -""" - -import copy -from ...bip380.miniscript import parsing - -from ...bip380.key import DescriptorKey -from ...bip380.utils.hashes import hash160 -from ...bip380.utils.script import ( - CScript, - OP_1, - OP_0, - OP_ADD, - OP_BOOLAND, - OP_BOOLOR, - OP_DUP, - OP_ELSE, - OP_ENDIF, - OP_EQUAL, - OP_EQUALVERIFY, - OP_FROMALTSTACK, - OP_IFDUP, - OP_IF, - OP_CHECKLOCKTIMEVERIFY, - OP_CHECKMULTISIG, - OP_CHECKMULTISIGVERIFY, - OP_CHECKSEQUENCEVERIFY, - OP_CHECKSIG, - OP_CHECKSIGVERIFY, - OP_HASH160, - OP_HASH256, - OP_NOTIF, - OP_RIPEMD160, - OP_SHA256, - OP_SIZE, - OP_SWAP, - OP_TOALTSTACK, - OP_VERIFY, - OP_0NOTEQUAL, -) - -from .errors import MiniscriptNodeCreationError -from .property import Property -from .satisfaction import ExecutionInfo, Satisfaction - - -# Threshold for nLockTime: below this value it is interpreted as block number, -# otherwise as UNIX timestamp. -LOCKTIME_THRESHOLD = 500000000 # Tue Nov 5 00:53:20 1985 UTC - -# If CTxIn::nSequence encodes a relative lock-time and this flag -# is set, the relative lock-time has units of 512 seconds, -# otherwise it specifies blocks with a granularity of 1. -SEQUENCE_LOCKTIME_TYPE_FLAG = 1 << 22 - - -class Node: - """A Miniscript fragment.""" - - # The fragment's type and properties - p = None - # List of all sub fragments - subs = [] - # A list of Script elements, a CScript is created all at once in the script() method. - _script = [] - # Whether any satisfaction for this fragment require a signature - needs_sig = None - # Whether any dissatisfaction for this fragment requires a signature - is_forced = None - # Whether this fragment has a unique unconditional satisfaction, and all conditional - # ones require a signature. - is_expressive = None - # Whether for any possible way to satisfy this fragment (may be none), a - # non-malleable satisfaction exists. - is_nonmalleable = None - # Whether this node or any of its subs contains an absolute heightlock - abs_heightlocks = None - # Whether this node or any of its subs contains a relative heightlock - rel_heightlocks = None - # Whether this node or any of its subs contains an absolute timelock - abs_timelocks = None - # Whether this node or any of its subs contains a relative timelock - rel_timelocks = None - # Whether this node does not contain a mix of timelock or heightlock of different types. - # That is, not (abs_heightlocks and rel_heightlocks or abs_timelocks and abs_timelocks) - no_timelock_mix = None - # Information about this Miniscript execution (satisfaction cost, etc..) - exec_info = None - - def __init__(self, *args, **kwargs): - # Needs to be implemented by derived classes. - raise NotImplementedError - - def from_str(ms_str): - """Parse a Miniscript fragment from its string representation.""" - assert isinstance(ms_str, str) - return parsing.miniscript_from_str(ms_str) - - def from_script(script, pkh_preimages={}): - """Decode a Miniscript fragment from its Script representation.""" - assert isinstance(script, CScript) - return parsing.miniscript_from_script(script, pkh_preimages) - - # TODO: have something like BuildScript from Core and get rid of the _script member. - @property - def script(self): - return CScript(self._script) - - @property - def keys(self): - """Get the list of all keys from this Miniscript, in order of apparition.""" - # Overriden by fragments that actually have keys. - return [key for sub in self.subs for key in sub.keys] - - def satisfy(self, sat_material): - """Get the witness of the smallest non-malleable satisfaction for this fragment, - if one exists. - - :param sat_material: a SatisfactionMaterial containing available data to satisfy - challenges. - """ - sat = self.satisfaction(sat_material) - if not sat.has_sig: - return None - return sat.witness - - def satisfaction(self, sat_material): - """Get the satisfaction for this fragment. - - :param sat_material: a SatisfactionMaterial containing available data to satisfy - challenges. - """ - # Needs to be implemented by derived classes. - raise NotImplementedError - - def dissatisfaction(self): - """Get the dissatisfaction for this fragment.""" - # Needs to be implemented by derived classes. - raise NotImplementedError - - -class Just0(Node): - def __init__(self): - - self._script = [OP_0] - - self.p = Property("Bzud") - self.needs_sig = False - self.is_forced = False - self.is_expressive = True - self.is_nonmalleable = True - self.abs_heightlocks = False - self.rel_heightlocks = False - self.abs_timelocks = False - self.rel_timelocks = False - self.no_timelock_mix = True - self.exec_info = ExecutionInfo(0, 0, None, 0) - - def satisfaction(self, sat_material): - return Satisfaction.unavailable() - - def dissatisfaction(self): - return Satisfaction(witness=[]) - - def __repr__(self): - return "0" - - -class Just1(Node): - def __init__(self): - - self._script = [OP_1] - - self.p = Property("Bzu") - self.needs_sig = False - self.is_forced = True # No dissat - self.is_expressive = False # No dissat - self.is_nonmalleable = True # FIXME: how comes? Standardness rules? - self.abs_heightlocks = False - self.rel_heightlocks = False - self.abs_timelocks = False - self.rel_timelocks = False - self.no_timelock_mix = True - self.exec_info = ExecutionInfo(0, 0, 0, None) - - def satisfaction(self, sat_material): - return Satisfaction(witness=[]) - - def dissatisfaction(self): - return Satisfaction.unavailable() - - def __repr__(self): - return "1" - - -class PkNode(Node): - """A virtual class for nodes containing a single public key. - - Should not be instanced directly, use Pk() or Pkh(). - """ - - def __init__(self, pubkey): - - if isinstance(pubkey, bytes) or isinstance(pubkey, str): - self.pubkey = DescriptorKey(pubkey) - elif isinstance(pubkey, DescriptorKey): - self.pubkey = pubkey - else: - raise MiniscriptNodeCreationError("Invalid public key") - - self.needs_sig = True # FIXME: think about having it in 'c:' instead - self.is_forced = False - self.is_expressive = True - self.is_nonmalleable = True - self.abs_heightlocks = False - self.rel_heightlocks = False - self.abs_timelocks = False - self.rel_timelocks = False - self.no_timelock_mix = True - - @property - def keys(self): - return [self.pubkey] - - -class Pk(PkNode): - def __init__(self, pubkey): - PkNode.__init__(self, pubkey) - - self.p = Property("Konud") - self.exec_info = ExecutionInfo(0, 0, 0, 0) - - @property - def _script(self): - return [self.pubkey.bytes()] - - def satisfaction(self, sat_material): - sig = sat_material.signatures.get(self.pubkey.bytes()) - if sig is None: - return Satisfaction.unavailable() - return Satisfaction([sig], has_sig=True) - - def dissatisfaction(self): - return Satisfaction(witness=[b""]) - - def __repr__(self): - return f"pk_k({self.pubkey})" - - -class Pkh(PkNode): - # FIXME: should we support a hash here, like rust-bitcoin? I don't think it's safe. - def __init__(self, pubkey): - PkNode.__init__(self, pubkey) - - self.p = Property("Knud") - self.exec_info = ExecutionInfo(3, 0, 1, 1) - - @property - def _script(self): - return [OP_DUP, OP_HASH160, self.pk_hash(), OP_EQUALVERIFY] - - def satisfaction(self, sat_material): - sig = sat_material.signatures.get(self.pubkey.bytes()) - if sig is None: - return Satisfaction.unavailable() - return Satisfaction(witness=[sig, self.pubkey.bytes()], has_sig=True) - - def dissatisfaction(self): - return Satisfaction(witness=[b"", self.pubkey.bytes()]) - - def __repr__(self): - return f"pk_h({self.pubkey})" - - def pk_hash(self): - assert isinstance(self.pubkey, DescriptorKey) - return hash160(self.pubkey.bytes()) - - -class Older(Node): - def __init__(self, value): - assert value > 0 and value < 2 ** 31 - - self.value = value - self._script = [self.value, OP_CHECKSEQUENCEVERIFY] - - self.p = Property("Bz") - self.needs_sig = False - self.is_forced = True - self.is_expressive = False # No dissat - self.is_nonmalleable = True - self.rel_timelocks = bool(value & SEQUENCE_LOCKTIME_TYPE_FLAG) - self.rel_heightlocks = not self.rel_timelocks - self.abs_heightlocks = False - self.abs_timelocks = False - self.no_timelock_mix = True - self.exec_info = ExecutionInfo(1, 0, 0, None) - - def satisfaction(self, sat_material): - if sat_material.max_sequence < self.value: - return Satisfaction.unavailable() - return Satisfaction(witness=[]) - - def dissatisfaction(self): - return Satisfaction.unavailable() - - def __repr__(self): - return f"older({self.value})" - - -class After(Node): - def __init__(self, value): - assert value > 0 and value < 2 ** 31 - - self.value = value - self._script = [self.value, OP_CHECKLOCKTIMEVERIFY] - - self.p = Property("Bz") - self.needs_sig = False - self.is_forced = True - self.is_expressive = False # No dissat - self.is_nonmalleable = True - self.abs_heightlocks = value < LOCKTIME_THRESHOLD - self.abs_timelocks = not self.abs_heightlocks - self.rel_heightlocks = False - self.rel_timelocks = False - self.no_timelock_mix = True - self.exec_info = ExecutionInfo(1, 0, 0, None) - - def satisfaction(self, sat_material): - if sat_material.max_lock_time < self.value: - return Satisfaction.unavailable() - return Satisfaction(witness=[]) - - def dissatisfaction(self): - return Satisfaction.unavailable() - - def __repr__(self): - return f"after({self.value})" - - -class HashNode(Node): - """A virtual class for fragments with hashlock semantics. - - Should not be instanced directly, use concrete fragments instead. - """ - - def __init__(self, digest, hash_op): - assert isinstance(digest, bytes) # TODO: real errors - - self.digest = digest - self._script = [OP_SIZE, 32, OP_EQUALVERIFY, hash_op, digest, OP_EQUAL] - - self.p = Property("Bonud") - self.needs_sig = False - self.is_forced = False - self.is_expressive = False - self.is_nonmalleable = True - self.abs_heightlocks = False - self.rel_heightlocks = False - self.abs_timelocks = False - self.rel_timelocks = False - self.no_timelock_mix = True - self.exec_info = ExecutionInfo(4, 0, 1, None) - - def satisfaction(self, sat_material): - preimage = sat_material.preimages.get(self.digest) - if preimage is None: - return Satisfaction.unavailable() - return Satisfaction(witness=[preimage]) - - def dissatisfaction(self): - return Satisfaction.unavailable() - return Satisfaction(witness=[b""]) - - -class Sha256(HashNode): - def __init__(self, digest): - assert len(digest) == 32 # TODO: real errors - HashNode.__init__(self, digest, OP_SHA256) - - def __repr__(self): - return f"sha256({self.digest.hex()})" - - -class Hash256(HashNode): - def __init__(self, digest): - assert len(digest) == 32 # TODO: real errors - HashNode.__init__(self, digest, OP_HASH256) - - def __repr__(self): - return f"hash256({self.digest.hex()})" - - -class Ripemd160(HashNode): - def __init__(self, digest): - assert len(digest) == 20 # TODO: real errors - HashNode.__init__(self, digest, OP_RIPEMD160) - - def __repr__(self): - return f"ripemd160({self.digest.hex()})" - - -class Hash160(HashNode): - def __init__(self, digest): - assert len(digest) == 20 # TODO: real errors - HashNode.__init__(self, digest, OP_HASH160) - - def __repr__(self): - return f"hash160({self.digest.hex()})" - - -class Multi(Node): - def __init__(self, k, keys): - assert 1 <= k <= len(keys) - assert all(isinstance(k, DescriptorKey) for k in keys) - - self.k = k - self.pubkeys = keys - - self.p = Property("Bndu") - self.needs_sig = True - self.is_forced = False - self.is_expressive = True - self.is_nonmalleable = True - self.abs_heightlocks = False - self.rel_heightlocks = False - self.abs_timelocks = False - self.rel_timelocks = False - self.no_timelock_mix = True - self.exec_info = ExecutionInfo(1, len(keys), 1 + k, 1 + k) - - @property - def keys(self): - return self.pubkeys - - @property - def _script(self): - return [ - self.k, - *[k.bytes() for k in self.keys], - len(self.keys), - OP_CHECKMULTISIG, - ] - - def satisfaction(self, sat_material): - sigs = [] - for key in self.keys: - sig = sat_material.signatures.get(key.bytes()) - if sig is not None: - assert isinstance(sig, bytes) - sigs.append(sig) - if len(sigs) == self.k: - break - if len(sigs) < self.k: - return Satisfaction.unavailable() - return Satisfaction(witness=[b""] + sigs, has_sig=True) - - def dissatisfaction(self): - return Satisfaction(witness=[b""] * (self.k + 1)) - - def __repr__(self): - return f"multi({','.join([str(self.k)] + [str(k) for k in self.keys])})" - - -class AndV(Node): - def __init__(self, sub_x, sub_y): - assert sub_x.p.V - assert sub_y.p.has_any("BKV") - - self.subs = [sub_x, sub_y] - - self.p = Property( - sub_y.p.type() - + ("z" if sub_x.p.z and sub_y.p.z else "") - + ("o" if sub_x.p.z and sub_y.p.o or sub_x.p.o and sub_y.p.z else "") - + ("n" if sub_x.p.n or sub_x.p.z and sub_y.p.n else "") - + ("u" if sub_y.p.u else "") - ) - self.needs_sig = any(sub.needs_sig for sub in self.subs) - self.is_forced = any(sub.needs_sig for sub in self.subs) - self.is_expressive = False # Not 'd' - self.is_nonmalleable = all(sub.is_nonmalleable for sub in self.subs) - self.abs_heightlocks = any(sub.abs_heightlocks for sub in self.subs) - self.rel_heightlocks = any(sub.rel_heightlocks for sub in self.subs) - self.abs_timelocks = any(sub.abs_timelocks for sub in self.subs) - self.rel_timelocks = any(sub.rel_timelocks for sub in self.subs) - self.no_timelock_mix = not ( - self.abs_heightlocks - and self.abs_timelocks - or self.rel_heightlocks - and self.rel_timelocks - ) - - @property - def _script(self): - return sum((sub._script for sub in self.subs), start=[]) - - @property - def exec_info(self): - exec_info = ExecutionInfo.from_concat( - self.subs[0].exec_info, self.subs[1].exec_info - ) - exec_info.set_undissatisfiable() # it's V. - return exec_info - - def satisfaction(self, sat_material): - return Satisfaction.from_concat(sat_material, *self.subs) - - def dissatisfaction(self): - return Satisfaction.unavailable() # it's V. - - def __repr__(self): - return f"and_v({','.join(map(str, self.subs))})" - - -class AndB(Node): - def __init__(self, sub_x, sub_y): - assert sub_x.p.B and sub_y.p.W - - self.subs = [sub_x, sub_y] - - self.p = Property( - "Bu" - + ("z" if sub_x.p.z and sub_y.p.z else "") - + ("o" if sub_x.p.z and sub_y.p.o or sub_x.p.o and sub_y.p.z else "") - + ("n" if sub_x.p.n or sub_x.p.z and sub_y.p.n else "") - + ("d" if sub_x.p.d and sub_y.p.d else "") - + ("u" if sub_y.p.u else "") - ) - self.needs_sig = any(sub.needs_sig for sub in self.subs) - self.is_forced = ( - sub_x.is_forced - and sub_y.is_forced - or any(sub.is_forced and sub.needs_sig for sub in self.subs) - ) - self.is_expressive = all(sub.is_forced and sub.needs_sig for sub in self.subs) - self.is_nonmalleable = all(sub.is_nonmalleable for sub in self.subs) - self.abs_heightlocks = any(sub.abs_heightlocks for sub in self.subs) - self.rel_heightlocks = any(sub.rel_heightlocks for sub in self.subs) - self.abs_timelocks = any(sub.abs_timelocks for sub in self.subs) - self.rel_timelocks = any(sub.rel_timelocks for sub in self.subs) - self.no_timelock_mix = not ( - self.abs_heightlocks - and self.abs_timelocks - or self.rel_heightlocks - and self.rel_timelocks - ) - - @property - def _script(self): - return sum((sub._script for sub in self.subs), start=[]) + [OP_BOOLAND] - - @property - def exec_info(self): - return ExecutionInfo.from_concat( - self.subs[0].exec_info, self.subs[1].exec_info, ops_count=1 - ) - - def satisfaction(self, sat_material): - return Satisfaction.from_concat(sat_material, self.subs[0], self.subs[1]) - - def dissatisfaction(self): - return self.subs[1].dissatisfaction() + self.subs[0].dissatisfaction() - - def __repr__(self): - return f"and_b({','.join(map(str, self.subs))})" - - -class OrB(Node): - def __init__(self, sub_x, sub_z): - assert sub_x.p.has_all("Bd") - assert sub_z.p.has_all("Wd") - - self.subs = [sub_x, sub_z] - - self.p = Property( - "Bdu" - + ("z" if sub_x.p.z and sub_z.p.z else "") - + ("o" if sub_x.p.z and sub_z.p.o or sub_x.p.o and sub_z.p.z else "") - ) - self.needs_sig = all(sub.needs_sig for sub in self.subs) - self.is_forced = False # Both subs are 'd' - self.is_expressive = all(sub.is_expressive for sub in self.subs) - self.is_nonmalleable = all( - sub.is_nonmalleable and sub.is_expressive for sub in self.subs - ) and any(sub.needs_sig for sub in self.subs) - self.abs_heightlocks = any(sub.abs_heightlocks for sub in self.subs) - self.rel_heightlocks = any(sub.rel_heightlocks for sub in self.subs) - self.abs_timelocks = any(sub.abs_timelocks for sub in self.subs) - self.rel_timelocks = any(sub.rel_timelocks for sub in self.subs) - self.no_timelock_mix = all(sub.no_timelock_mix for sub in self.subs) - - @property - def _script(self): - return sum((sub._script for sub in self.subs), start=[]) + [OP_BOOLOR] - - @property - def exec_info(self): - return ExecutionInfo.from_concat( - self.subs[0].exec_info, - self.subs[1].exec_info, - ops_count=1, - disjunction=True, - ) - - def satisfaction(self, sat_material): - return Satisfaction.from_concat( - sat_material, self.subs[0], self.subs[1], disjunction=True - ) - - def dissatisfaction(self): - return self.subs[1].dissatisfaction() + self.subs[0].dissatisfaction() - - def __repr__(self): - return f"or_b({','.join(map(str, self.subs))})" - - -class OrC(Node): - def __init__(self, sub_x, sub_z): - assert sub_x.p.has_all("Bdu") and sub_z.p.V - - self.subs = [sub_x, sub_z] - - self.p = Property( - "V" - + ("z" if sub_x.p.z and sub_z.p.z else "") - + ("o" if sub_x.p.o and sub_z.p.z else "") - ) - self.needs_sig = all(sub.needs_sig for sub in self.subs) - self.is_forced = True # Because sub_z is 'V' - self.is_expressive = False # V - self.is_nonmalleable = ( - all(sub.is_nonmalleable for sub in self.subs) - and any(sub.needs_sig for sub in self.subs) - and sub_x.is_expressive - ) - self.abs_heightlocks = any(sub.abs_heightlocks for sub in self.subs) - self.rel_heightlocks = any(sub.rel_heightlocks for sub in self.subs) - self.abs_timelocks = any(sub.abs_timelocks for sub in self.subs) - self.rel_timelocks = any(sub.rel_timelocks for sub in self.subs) - self.no_timelock_mix = all(sub.no_timelock_mix for sub in self.subs) - - @property - def _script(self): - return self.subs[0]._script + [OP_NOTIF] + self.subs[1]._script + [OP_ENDIF] - - @property - def exec_info(self): - exec_info = ExecutionInfo.from_or_uneven( - self.subs[0].exec_info, self.subs[1].exec_info, ops_count=2 - ) - exec_info.set_undissatisfiable() # it's V. - return exec_info - - def satisfaction(self, sat_material): - return Satisfaction.from_or_uneven(sat_material, self.subs[0], self.subs[1]) - - def dissatisfaction(self): - return Satisfaction.unavailable() # it's V. - - def __repr__(self): - return f"or_c({','.join(map(str, self.subs))})" - - -class OrD(Node): - def __init__(self, sub_x, sub_z): - assert sub_x.p.has_all("Bdu") - assert sub_z.p.has_all("B") - - self.subs = [sub_x, sub_z] - - self.p = Property( - "B" - + ("z" if sub_x.p.z and sub_z.p.z else "") - + ("o" if sub_x.p.o and sub_z.p.z else "") - + ("d" if sub_z.p.d else "") - + ("u" if sub_z.p.u else "") - ) - self.needs_sig = all(sub.needs_sig for sub in self.subs) - self.is_forced = all(sub.is_forced for sub in self.subs) - self.is_expressive = all(sub.is_expressive for sub in self.subs) - self.is_nonmalleable = ( - all(sub.is_nonmalleable for sub in self.subs) - and any(sub.needs_sig for sub in self.subs) - and sub_x.is_expressive - ) - self.abs_heightlocks = any(sub.abs_heightlocks for sub in self.subs) - self.rel_heightlocks = any(sub.rel_heightlocks for sub in self.subs) - self.abs_timelocks = any(sub.abs_timelocks for sub in self.subs) - self.rel_timelocks = any(sub.rel_timelocks for sub in self.subs) - self.no_timelock_mix = all(sub.no_timelock_mix for sub in self.subs) - - @property - def _script(self): - return ( - self.subs[0]._script - + [OP_IFDUP, OP_NOTIF] - + self.subs[1]._script - + [OP_ENDIF] - ) - - @property - def exec_info(self): - return ExecutionInfo.from_or_uneven( - self.subs[0].exec_info, self.subs[1].exec_info, ops_count=3 - ) - - def satisfaction(self, sat_material): - return Satisfaction.from_or_uneven(sat_material, self.subs[0], self.subs[1]) - - def dissatisfaction(self): - return self.subs[1].dissatisfaction() + self.subs[0].dissatisfaction() - - def __repr__(self): - return f"or_d({','.join(map(str, self.subs))})" - - -class OrI(Node): - def __init__(self, sub_x, sub_z): - assert sub_x.p.type() == sub_z.p.type() and sub_x.p.has_any("BKV") - - self.subs = [sub_x, sub_z] - - self.p = Property( - sub_x.p.type() - + ("o" if sub_x.p.z and sub_z.p.z else "") - + ("d" if sub_x.p.d or sub_z.p.d else "") - + ("u" if sub_x.p.u and sub_z.p.u else "") - ) - self.needs_sig = all(sub.needs_sig for sub in self.subs) - self.is_forced = all(sub.is_forced for sub in self.subs) - self.is_expressive = ( - sub_x.is_expressive - and sub_z.is_forced - or sub_x.is_forced - and sub_z.is_expressive - ) - self.is_nonmalleable = all(sub.is_nonmalleable for sub in self.subs) and any( - sub.needs_sig for sub in self.subs - ) - self.abs_heightlocks = any(sub.abs_heightlocks for sub in self.subs) - self.rel_heightlocks = any(sub.rel_heightlocks for sub in self.subs) - self.abs_timelocks = any(sub.abs_timelocks for sub in self.subs) - self.rel_timelocks = any(sub.rel_timelocks for sub in self.subs) - self.no_timelock_mix = all(sub.no_timelock_mix for sub in self.subs) - - @property - def _script(self): - return ( - [OP_IF] - + self.subs[0]._script - + [OP_ELSE] - + self.subs[1]._script - + [OP_ENDIF] - ) - - @property - def exec_info(self): - return ExecutionInfo.from_or_even( - self.subs[0].exec_info, self.subs[1].exec_info, ops_count=3 - ) - - def satisfaction(self, sat_material): - return (self.subs[0].satisfaction(sat_material) + Satisfaction([b"\x01"])) | ( - self.subs[1].satisfaction(sat_material) + Satisfaction([b""]) - ) - - def dissatisfaction(self): - return (self.subs[0].dissatisfaction() + Satisfaction(witness=[b"\x01"])) | ( - self.subs[1].dissatisfaction() + Satisfaction(witness=[b""]) - ) - - def __repr__(self): - return f"or_i({','.join(map(str, self.subs))})" - - -class AndOr(Node): - def __init__(self, sub_x, sub_y, sub_z): - assert sub_x.p.has_all("Bdu") - assert sub_y.p.type() == sub_z.p.type() and sub_y.p.has_any("BKV") - - self.subs = [sub_x, sub_y, sub_z] - - self.p = Property( - sub_y.p.type() - + ("z" if sub_x.p.z and sub_y.p.z and sub_z.p.z else "") - + ( - "o" - if sub_x.p.z - and sub_y.p.o - and sub_z.p.o - or sub_x.p.o - and sub_y.p.z - and sub_z.p.z - else "" - ) - + ("d" if sub_z.p.d else "") - + ("u" if sub_y.p.u and sub_z.p.u else "") - ) - self.needs_sig = sub_x.needs_sig and (sub_y.needs_sig or sub_z.needs_sig) - self.is_forced = sub_z.is_forced and (sub_x.needs_sig or sub_y.is_forced) - self.is_expressive = ( - sub_x.is_expressive - and sub_z.is_expressive - and (sub_x.needs_sig or sub_y.is_forced) - ) - self.is_nonmalleable = ( - all(sub.is_nonmalleable for sub in self.subs) - and any(sub.needs_sig for sub in self.subs) - and sub_x.is_expressive - ) - self.abs_heightlocks = any(sub.abs_heightlocks for sub in self.subs) - self.rel_heightlocks = any(sub.rel_heightlocks for sub in self.subs) - self.abs_timelocks = any(sub.abs_timelocks for sub in self.subs) - self.rel_timelocks = any(sub.rel_timelocks for sub in self.subs) - # X and Y, or Z. So we have a mix if any contain a timelock mix, or - # there is a mix between X and Y. - self.no_timelock_mix = all(sub.no_timelock_mix for sub in self.subs) and not ( - any(sub.rel_timelocks for sub in [sub_x, sub_y]) - and any(sub.rel_heightlocks for sub in [sub_x, sub_y]) - or any(sub.abs_timelocks for sub in [sub_x, sub_y]) - and any(sub.abs_heightlocks for sub in [sub_x, sub_y]) - ) - - @property - def _script(self): - return ( - self.subs[0]._script - + [OP_NOTIF] - + self.subs[2]._script - + [OP_ELSE] - + self.subs[1]._script - + [OP_ENDIF] - ) - - @property - def exec_info(self): - return ExecutionInfo.from_andor_uneven( - self.subs[0].exec_info, - self.subs[1].exec_info, - self.subs[2].exec_info, - ops_count=3, - ) - - def satisfaction(self, sat_material): - # (A and B) or (!A and C) - return ( - self.subs[1].satisfaction(sat_material) - + self.subs[0].satisfaction(sat_material) - ) | (self.subs[2].satisfaction(sat_material) + self.subs[0].dissatisfaction()) - - def dissatisfaction(self): - # Dissatisfy X and Z - return self.subs[2].dissatisfaction() + self.subs[0].dissatisfaction() - - def __repr__(self): - return f"andor({','.join(map(str, self.subs))})" - - -class AndN(AndOr): - def __init__(self, sub_x, sub_y): - AndOr.__init__(self, sub_x, sub_y, Just0()) - - def __repr__(self): - return f"and_n({self.subs[0]},{self.subs[1]})" - - -class Thresh(Node): - def __init__(self, k, subs): - n = len(subs) - assert 1 <= k <= n - - self.k = k - self.subs = subs - - all_z = True - all_z_but_one_odu = False - all_e = True - all_m = True - s_count = 0 - # If k == 1, just check each child for k - if k > 1: - self.abs_heightlocks = subs[0].abs_heightlocks - self.rel_heightlocks = subs[0].rel_heightlocks - self.abs_timelocks = subs[0].abs_timelocks - self.rel_timelocks = subs[0].rel_timelocks - else: - self.no_timelock_mix = True - - assert subs[0].p.has_all("Bdu") - for sub in subs[1:]: - assert sub.p.has_all("Wdu") - if not sub.p.z: - if all_z_but_one_odu: - # Fails "all 'z' but one" - all_z_but_one_odu = False - if all_z and sub.p.has_all("odu"): - # They were all 'z' up to now. - all_z_but_one_odu = True - all_z = False - all_e = all_e and sub.is_expressive - all_m = all_m and sub.is_nonmalleable - if sub.needs_sig: - s_count += 1 - if k > 1: - self.abs_heightlocks |= sub.abs_heightlocks - self.rel_heightlocks |= sub.rel_heightlocks - self.abs_timelocks |= sub.abs_timelocks - self.rel_timelocks |= sub.rel_timelocks - else: - self.no_timelock_mix &= sub.no_timelock_mix - - self.p = Property( - "Bdu" + ("z" if all_z else "") + ("o" if all_z_but_one_odu else "") - ) - self.needs_sig = s_count >= n - k - self.is_forced = False # All subs need to be 'd' - self.is_expressive = all_e and s_count == n - self.is_nonmalleable = all_e and s_count >= n - k - if k > 1: - self.no_timelock_mix = not ( - self.abs_heightlocks - and self.abs_timelocks - or self.rel_heightlocks - and self.rel_timelocks - ) - - @property - def _script(self): - return ( - self.subs[0]._script - + sum(((sub._script + [OP_ADD]) for sub in self.subs[1:]), start=[]) - + [self.k, OP_EQUAL] - ) - - @property - def exec_info(self): - return ExecutionInfo.from_thresh(self.k, [sub.exec_info for sub in self.subs]) - - def satisfaction(self, sat_material): - return Satisfaction.from_thresh(sat_material, self.k, self.subs) - - def dissatisfaction(self): - return sum( - [sub.dissatisfaction() for sub in self.subs], start=Satisfaction(witness=[]) - ) - - def __repr__(self): - return f"thresh({self.k},{','.join(map(str, self.subs))})" - - -class WrapperNode(Node): - """A virtual base class for wrappers. - - Don't instanciate it directly, use concret wrapper fragments instead. - """ - - def __init__(self, sub): - self.subs = [sub] - - # Properties for most wrappers are directly inherited. When it's not, they - # are overriden in the fragment's __init__. - self.needs_sig = sub.needs_sig - self.is_forced = sub.is_forced - self.is_expressive = sub.is_expressive - self.is_nonmalleable = sub.is_nonmalleable - self.abs_heightlocks = sub.abs_heightlocks - self.rel_heightlocks = sub.rel_heightlocks - self.abs_timelocks = sub.abs_timelocks - self.rel_timelocks = sub.rel_timelocks - self.no_timelock_mix = not ( - self.abs_heightlocks - and self.abs_timelocks - or self.rel_heightlocks - and self.rel_timelocks - ) - - @property - def sub(self): - # Wrapper have a single sub - return self.subs[0] - - def satisfaction(self, sat_material): - # Most wrappers are satisfied this way, for special cases it's overriden. - return self.subs[0].satisfaction(sat_material) - - def dissatisfaction(self): - # Most wrappers are satisfied this way, for special cases it's overriden. - return self.subs[0].dissatisfaction() - - def skip_colon(self): - # We need to check this because of the pk() and pkh() aliases. - if isinstance(self.subs[0], WrapC) and isinstance( - self.subs[0].subs[0], (Pk, Pkh) - ): - return False - return isinstance(self.subs[0], WrapperNode) - - -class WrapA(WrapperNode): - def __init__(self, sub): - assert sub.p.B - WrapperNode.__init__(self, sub) - - self.p = Property("W" + "".join(c for c in "ud" if getattr(sub.p, c))) - - @property - def _script(self): - return [OP_TOALTSTACK] + self.sub._script + [OP_FROMALTSTACK] - - @property - def exec_info(self): - return ExecutionInfo.from_wrap(self.sub.exec_info, ops_count=2) - - def __repr__(self): - # Don't duplicate colons - if self.skip_colon(): - return f"a{self.subs[0]}" - return f"a:{self.subs[0]}" - - -class WrapS(WrapperNode): - def __init__(self, sub): - assert sub.p.has_all("Bo") - WrapperNode.__init__(self, sub) - - self.p = Property("W" + "".join(c for c in "ud" if getattr(sub.p, c))) - - @property - def _script(self): - return [OP_SWAP] + self.sub._script - - @property - def exec_info(self): - return ExecutionInfo.from_wrap(self.sub.exec_info, ops_count=1) - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"s{self.subs[0]}" - return f"s:{self.subs[0]}" - - -class WrapC(WrapperNode): - def __init__(self, sub): - assert sub.p.K - WrapperNode.__init__(self, sub) - - # FIXME: shouldn't n and d be default props on the website? - self.p = Property("Bu" + "".join(c for c in "dno" if getattr(sub.p, c))) - - @property - def _script(self): - return self.sub._script + [OP_CHECKSIG] - - @property - def exec_info(self): - # FIXME: should need_sig be set to True here instead of in keys? - return ExecutionInfo.from_wrap(self.sub.exec_info, ops_count=1, sat=1, dissat=1) - - def __repr__(self): - # Special case of aliases - if isinstance(self.subs[0], Pk): - return f"pk({self.subs[0].pubkey})" - if isinstance(self.subs[0], Pkh): - return f"pkh({self.subs[0].pubkey})" - # Avoid duplicating colons - if self.skip_colon(): - return f"c{self.subs[0]}" - return f"c:{self.subs[0]}" - - -class WrapT(AndV, WrapperNode): - def __init__(self, sub): - AndV.__init__(self, sub, Just1()) - - def is_wrapper(self): - return True - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"t{self.subs[0]}" - return f"t:{self.subs[0]}" - - -class WrapD(WrapperNode): - def __init__(self, sub): - assert sub.p.has_all("Vz") - WrapperNode.__init__(self, sub) - - self.p = Property("Bond") - self.is_forced = True # sub is V - self.is_expressive = True # sub is V, and we add a single dissat - - @property - def _script(self): - return [OP_DUP, OP_IF] + self.sub._script + [OP_ENDIF] - - @property - def exec_info(self): - return ExecutionInfo.from_wrap_dissat( - self.sub.exec_info, ops_count=3, sat=1, dissat=1 - ) - - def satisfaction(self, sat_material): - return Satisfaction(witness=[b"\x01"]) + self.subs[0].satisfaction(sat_material) - - def dissatisfaction(self): - return Satisfaction(witness=[b""]) - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"d{self.subs[0]}" - return f"d:{self.subs[0]}" - - -class WrapV(WrapperNode): - def __init__(self, sub): - assert sub.p.B - WrapperNode.__init__(self, sub) - - self.p = Property("V" + "".join(c for c in "zon" if getattr(sub.p, c))) - self.is_forced = True # V - self.is_expressive = False # V - - @property - def _script(self): - if self.sub._script[-1] == OP_CHECKSIG: - return self.sub._script[:-1] + [OP_CHECKSIGVERIFY] - elif self.sub._script[-1] == OP_CHECKMULTISIG: - return self.sub._script[:-1] + [OP_CHECKMULTISIGVERIFY] - elif self.sub._script[-1] == OP_EQUAL: - return self.sub._script[:-1] + [OP_EQUALVERIFY] - return self.sub._script + [OP_VERIFY] - - @property - def exec_info(self): - verify_cost = int(self._script[-1] == OP_VERIFY) - return ExecutionInfo.from_wrap(self.sub.exec_info, ops_count=verify_cost) - - def dissatisfaction(self): - return Satisfaction.unavailable() # It's V. - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"v{self.subs[0]}" - return f"v:{self.subs[0]}" - - -class WrapJ(WrapperNode): - def __init__(self, sub): - assert sub.p.has_all("Bn") - WrapperNode.__init__(self, sub) - - self.p = Property("Bnd" + "".join(c for c in "ou" if getattr(sub.p, c))) - self.is_forced = False # d - self.is_expressive = sub.is_forced - - @property - def _script(self): - return [OP_SIZE, OP_0NOTEQUAL, OP_IF, *self.sub._script, OP_ENDIF] - - @property - def exec_info(self): - return ExecutionInfo.from_wrap_dissat(self.sub.exec_info, ops_count=4, dissat=1) - - def dissatisfaction(self): - return Satisfaction(witness=[b""]) - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"j{self.subs[0]}" - return f"j:{self.subs[0]}" - - -class WrapN(WrapperNode): - def __init__(self, sub): - assert sub.p.B - WrapperNode.__init__(self, sub) - - self.p = Property("Bu" + "".join(c for c in "zond" if getattr(sub.p, c))) - - @property - def _script(self): - return [*self.sub._script, OP_0NOTEQUAL] - - @property - def exec_info(self): - return ExecutionInfo.from_wrap(self.sub.exec_info, ops_count=1) - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"n{self.subs[0]}" - return f"n:{self.subs[0]}" - - -class WrapL(OrI, WrapperNode): - def __init__(self, sub): - OrI.__init__(self, Just0(), sub) - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"l{self.subs[1]}" - return f"l:{self.subs[1]}" - - -class WrapU(OrI, WrapperNode): - def __init__(self, sub): - OrI.__init__(self, sub, Just0()) - - def __repr__(self): - # Avoid duplicating colons - if self.skip_colon(): - return f"u{self.subs[0]}" - return f"u:{self.subs[0]}" diff --git a/bitcoin_client/ledger_bitcoin/bip380/miniscript/parsing.py b/bitcoin_client/ledger_bitcoin/bip380/miniscript/parsing.py deleted file mode 100644 index 2058b7b6b..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/miniscript/parsing.py +++ /dev/null @@ -1,736 +0,0 @@ -""" -Utilities to parse Miniscript from string and Script representations. -""" - -from ...bip380.miniscript import fragments - -from ...bip380.key import DescriptorKey -from ...bip380.miniscript.errors import MiniscriptMalformed -from ...bip380.utils.script import ( - CScriptOp, - OP_ADD, - OP_BOOLAND, - OP_BOOLOR, - OP_CHECKSIGVERIFY, - OP_CHECKMULTISIGVERIFY, - OP_EQUALVERIFY, - OP_DUP, - OP_ELSE, - OP_ENDIF, - OP_EQUAL, - OP_FROMALTSTACK, - OP_IFDUP, - OP_IF, - OP_CHECKLOCKTIMEVERIFY, - OP_CHECKMULTISIG, - OP_CHECKSEQUENCEVERIFY, - OP_CHECKSIG, - OP_HASH160, - OP_HASH256, - OP_NOTIF, - OP_RIPEMD160, - OP_SHA256, - OP_SIZE, - OP_SWAP, - OP_TOALTSTACK, - OP_VERIFY, - OP_0NOTEQUAL, - ScriptNumError, - read_script_number, -) - - -def stack_item_to_int(item): - """ - Convert a stack item to an integer depending on its type. - May raise an exception if the item is bytes, otherwise return None if it - cannot perform the conversion. - """ - if isinstance(item, bytes): - return read_script_number(item) - - if isinstance(item, fragments.Node): - if isinstance(item, fragments.Just1): - return 1 - if isinstance(item, fragments.Just0): - return 0 - - if isinstance(item, int): - return item - - return None - - -def decompose_script(script): - """Create a list of Script element from a CScript, decomposing the compact - -VERIFY opcodes into the non-VERIFY OP and an OP_VERIFY. - """ - elems = [] - for elem in script: - if elem == OP_CHECKSIGVERIFY: - elems += [OP_CHECKSIG, OP_VERIFY] - elif elem == OP_CHECKMULTISIGVERIFY: - elems += [OP_CHECKMULTISIG, OP_VERIFY] - elif elem == OP_EQUALVERIFY: - elems += [OP_EQUAL, OP_VERIFY] - else: - elems.append(elem) - return elems - - -def parse_term_single_elem(expr_list, idx): - """ - Try to parse a terminal node from the element of {expr_list} at {idx}. - """ - # Match against pk_k(key). - if ( - isinstance(expr_list[idx], bytes) - and len(expr_list[idx]) == 33 - and expr_list[idx][0] in [2, 3] - ): - expr_list[idx] = fragments.Pk(expr_list[idx]) - - # Match against JUST_1 and JUST_0. - if expr_list[idx] == 1: - expr_list[idx] = fragments.Just1() - if expr_list[idx] == b"": - expr_list[idx] = fragments.Just0() - - -def parse_term_2_elems(expr_list, idx): - """ - Try to parse a terminal node from two elements of {expr_list}, starting - from {idx}. - Return the new expression list on success, None if there was no match. - """ - elem_a = expr_list[idx] - elem_b = expr_list[idx + 1] - - # Only older() and after() as term with 2 stack items - if not isinstance(elem_b, CScriptOp): - return - try: - n = stack_item_to_int(elem_a) - if n is None: - return - except ScriptNumError: - return - - if n <= 0 or n >= 2 ** 31: - return - - if elem_b == OP_CHECKSEQUENCEVERIFY: - node = fragments.Older(n) - expr_list[idx: idx + 2] = [node] - return expr_list - - if elem_b == OP_CHECKLOCKTIMEVERIFY: - node = fragments.After(n) - expr_list[idx: idx + 2] = [node] - return expr_list - - -def parse_term_5_elems(expr_list, idx, pkh_preimages={}): - """ - Try to parse a terminal node from five elements of {expr_list}, starting - from {idx}. - Return the new expression list on success, None if there was no match. - """ - # The only 3 items node is pk_h - if expr_list[idx: idx + 2] != [OP_DUP, OP_HASH160]: - return - if not isinstance(expr_list[idx + 2], bytes): - return - if len(expr_list[idx + 2]) != 20: - return - if expr_list[idx + 3: idx + 5] != [OP_EQUAL, OP_VERIFY]: - return - - key_hash = expr_list[idx + 2] - key = pkh_preimages.get(key_hash) - assert key is not None # TODO: have a real error here - node = fragments.Pkh(key) - expr_list[idx: idx + 5] = [node] - return expr_list - - -def parse_term_7_elems(expr_list, idx): - """ - Try to parse a terminal node from seven elements of {expr_list}, starting - from {idx}. - Return the new expression list on success, None if there was no match. - """ - # Note how all the hashes are 7 elems because the VERIFY was decomposed - # Match against sha256. - if ( - expr_list[idx: idx + 5] == [OP_SIZE, b"\x20", OP_EQUAL, OP_VERIFY, OP_SHA256] - and isinstance(expr_list[idx + 5], bytes) - and len(expr_list[idx + 5]) == 32 - and expr_list[idx + 6] == OP_EQUAL - ): - node = fragments.Sha256(expr_list[idx + 5]) - expr_list[idx: idx + 7] = [node] - return expr_list - - # Match against hash256. - if ( - expr_list[idx: idx + 5] == [OP_SIZE, b"\x20", OP_EQUAL, OP_VERIFY, OP_HASH256] - and isinstance(expr_list[idx + 5], bytes) - and len(expr_list[idx + 5]) == 32 - and expr_list[idx + 6] == OP_EQUAL - ): - node = fragments.Hash256(expr_list[idx + 5]) - expr_list[idx: idx + 7] = [node] - return expr_list - - # Match against ripemd160. - if ( - expr_list[idx: idx + 5] - == [OP_SIZE, b"\x20", OP_EQUAL, OP_VERIFY, OP_RIPEMD160] - and isinstance(expr_list[idx + 5], bytes) - and len(expr_list[idx + 5]) == 20 - and expr_list[idx + 6] == OP_EQUAL - ): - node = fragments.Ripemd160(expr_list[idx + 5]) - expr_list[idx: idx + 7] = [node] - return expr_list - - # Match against hash160. - if ( - expr_list[idx: idx + 5] == [OP_SIZE, b"\x20", OP_EQUAL, OP_VERIFY, OP_HASH160] - and isinstance(expr_list[idx + 5], bytes) - and len(expr_list[idx + 5]) == 20 - and expr_list[idx + 6] == OP_EQUAL - ): - node = fragments.Hash160(expr_list[idx + 5]) - expr_list[idx: idx + 7] = [node] - return expr_list - - -def parse_nonterm_2_elems(expr_list, idx): - """ - Try to parse a non-terminal node from two elements of {expr_list}, starting - from {idx}. - Return the new expression list on success, None if there was no match. - """ - elem_a = expr_list[idx] - elem_b = expr_list[idx + 1] - - if isinstance(elem_a, fragments.Node): - # Match against and_v. - if isinstance(elem_b, fragments.Node) and elem_a.p.V and elem_b.p.has_any("BKV"): - # Is it a special case of t: wrapper? - if isinstance(elem_b, fragments.Just1): - node = fragments.WrapT(elem_a) - else: - node = fragments.AndV(elem_a, elem_b) - expr_list[idx: idx + 2] = [node] - return expr_list - - # Match against c wrapper. - if elem_b == OP_CHECKSIG and elem_a.p.K: - node = fragments.WrapC(elem_a) - expr_list[idx: idx + 2] = [node] - return expr_list - - # Match against v wrapper. - if elem_b == OP_VERIFY and elem_a.p.B: - node = fragments.WrapV(elem_a) - expr_list[idx: idx + 2] = [node] - return expr_list - - # Match against n wrapper. - if elem_b == OP_0NOTEQUAL and elem_a.p.B: - node = fragments.WrapN(elem_a) - expr_list[idx: idx + 2] = [node] - return expr_list - - # Match against s wrapper. - if isinstance(elem_b, fragments.Node) and elem_a == OP_SWAP and elem_b.p.has_all("Bo"): - node = fragments.WrapS(elem_b) - expr_list[idx: idx + 2] = [node] - return expr_list - - -def parse_nonterm_3_elems(expr_list, idx): - """ - Try to parse a non-terminal node from *at least* three elements of - {expr_list}, starting from {idx}. - Return the new expression list on success, None if there was no match. - """ - elem_a = expr_list[idx] - elem_b = expr_list[idx + 1] - elem_c = expr_list[idx + 2] - - if isinstance(elem_a, fragments.Node) and isinstance(elem_b, fragments.Node): - # Match against and_b. - if elem_c == OP_BOOLAND and elem_a.p.B and elem_b.p.W: - node = fragments.AndB(elem_a, elem_b) - expr_list[idx: idx + 3] = [node] - return expr_list - - # Match against or_b. - if elem_c == OP_BOOLOR and elem_a.p.has_all("Bd") and elem_b.p.has_all("Wd"): - node = fragments.OrB(elem_a, elem_b) - expr_list[idx: idx + 3] = [node] - return expr_list - - # Match against a wrapper. - if ( - elem_a == OP_TOALTSTACK - and isinstance(elem_b, fragments.Node) - and elem_b.p.B - and elem_c == OP_FROMALTSTACK - ): - node = fragments.WrapA(elem_b) - expr_list[idx: idx + 3] = [node] - return expr_list - - # FIXME: multi is a terminal! - # Match against a multi. - try: - k = stack_item_to_int(expr_list[idx]) - except ScriptNumError: - return - if k is None: - return - # ()* CHECKMULTISIG - if k > len(expr_list[idx + 1:]) - 2: - return - # Get the keys - keys = [] - i = idx + 1 - while idx < len(expr_list) - 2: - if not isinstance(expr_list[i], fragments.Pk): - break - keys.append(expr_list[i].pubkey) - i += 1 - if expr_list[i + 1] == OP_CHECKMULTISIG: - if k > len(keys): - return - try: - m = stack_item_to_int(expr_list[i]) - except ScriptNumError: - return - if m is None or m != len(keys): - return - node = fragments.Multi(k, keys) - expr_list[idx: i + 2] = [node] - return expr_list - - -def parse_nonterm_4_elems(expr_list, idx): - """ - Try to parse a non-terminal node from at least four elements of {expr_list}, - starting from {idx}. - Return the new expression list on success, None if there was no match. - """ - (it_a, it_b, it_c, it_d) = expr_list[idx: idx + 4] - - # Match against thresh. It's of the form [X] ([X] ADD)* k EQUAL - if isinstance(it_a, fragments.Node) and it_a.p.has_all("Bdu"): - subs = [it_a] - # The first matches, now do all the ([X] ADD)s and return - # if a pair is of the form (k, EQUAL). - for i in range(idx + 1, len(expr_list) - 1, 2): - if ( - isinstance(expr_list[i], fragments.Node) - and expr_list[i].p.has_all("Wdu") - and expr_list[i + 1] == OP_ADD - ): - subs.append(expr_list[i]) - continue - elif expr_list[i + 1] == OP_EQUAL: - try: - k = stack_item_to_int(expr_list[i]) - if len(subs) >= k >= 1: - node = fragments.Thresh(k, subs) - expr_list[idx: i + 1 + 1] = [node] - return expr_list - except ScriptNumError: - break - else: - break - - # Match against or_c. - if ( - isinstance(it_a, fragments.Node) - and it_a.p.has_all("Bdu") - and it_b == OP_NOTIF - and isinstance(it_c, fragments.Node) - and it_c.p.V - and it_d == OP_ENDIF - ): - node = fragments.OrC(it_a, it_c) - expr_list[idx: idx + 4] = [node] - return expr_list - - # Match against d wrapper. - if ( - [it_a, it_b] == [OP_DUP, OP_IF] - and isinstance(it_c, fragments.Node) - and it_c.p.has_all("Vz") - and it_d == OP_ENDIF - ): - node = fragments.WrapD(it_c) - expr_list[idx: idx + 4] = [node] - return expr_list - - -def parse_nonterm_5_elems(expr_list, idx): - """ - Try to parse a non-terminal node from five elements of {expr_list}, starting - from {idx}. - Return the new expression list on success, None if there was no match. - """ - (it_a, it_b, it_c, it_d, it_e) = expr_list[idx: idx + 5] - - # Match against or_d. - if ( - isinstance(it_a, fragments.Node) - and it_a.p.has_all("Bdu") - and [it_b, it_c] == [OP_IFDUP, OP_NOTIF] - and isinstance(it_d, fragments.Node) - and it_d.p.B - and it_e == OP_ENDIF - ): - node = fragments.OrD(it_a, it_d) - expr_list[idx: idx + 5] = [node] - return expr_list - - # Match against or_i. - if ( - it_a == OP_IF - and isinstance(it_b, fragments.Node) - and it_b.p.has_any("BKV") - and it_c == OP_ELSE - and isinstance(it_d, fragments.Node) - and it_d.p.has_any("BKV") - and it_e == OP_ENDIF - ): - if isinstance(it_b, fragments.Just0): - node = fragments.WrapL(it_d) - elif isinstance(it_d, fragments.Just0): - node = fragments.WrapU(it_b) - else: - node = fragments.OrI(it_b, it_d) - expr_list[idx: idx + 5] = [node] - return expr_list - - # Match against j wrapper. - if ( - [it_a, it_b, it_c] == [OP_SIZE, OP_0NOTEQUAL, OP_IF] - and isinstance(it_d, fragments.Node) - and it_e == OP_ENDIF - ): - node = fragments.WrapJ(expr_list[idx + 3]) - expr_list[idx: idx + 5] = [node] - return expr_list - - -def parse_nonterm_6_elems(expr_list, idx): - """ - Try to parse a non-terminal node from six elements of {expr_list}, starting - from {idx}. - Return the new expression list on success, None if there was no match. - """ - (it_a, it_b, it_c, it_d, it_e, it_f) = expr_list[idx: idx + 6] - - # Match against andor. - if ( - isinstance(it_a, fragments.Node) - and it_a.p.has_all("Bdu") - and it_b == OP_NOTIF - and isinstance(it_c, fragments.Node) - and it_c.p.has_any("BKV") - and it_d == OP_ELSE - and isinstance(it_e, fragments.Node) - and it_e.p.has_any("BKV") - and it_f == OP_ENDIF - ): - if isinstance(it_c, fragments.Just0): - node = fragments.AndN(it_a, it_e) - else: - node = fragments.AndOr(it_a, it_e, it_c) - expr_list[idx: idx + 6] = [node] - return expr_list - - -def parse_expr_list(expr_list): - """Parse a node from a list of Script elements.""" - # Every recursive call must progress the AST construction, - # until it is complete (single root node remains). - expr_list_len = len(expr_list) - - # Root node reached. - if expr_list_len == 1 and isinstance(expr_list[0], fragments.Node): - return expr_list[0] - - # Step through each list index and match against templates. - idx = expr_list_len - 1 - while idx >= 0: - if expr_list_len - idx >= 2: - new_expr_list = parse_nonterm_2_elems(expr_list, idx) - if new_expr_list is not None: - return parse_expr_list(new_expr_list) - - if expr_list_len - idx >= 3: - new_expr_list = parse_nonterm_3_elems(expr_list, idx) - if new_expr_list is not None: - return parse_expr_list(new_expr_list) - - if expr_list_len - idx >= 4: - new_expr_list = parse_nonterm_4_elems(expr_list, idx) - if new_expr_list is not None: - return parse_expr_list(new_expr_list) - - if expr_list_len - idx >= 5: - new_expr_list = parse_nonterm_5_elems(expr_list, idx) - if new_expr_list is not None: - return parse_expr_list(new_expr_list) - - if expr_list_len - idx >= 6: - new_expr_list = parse_nonterm_6_elems(expr_list, idx) - if new_expr_list is not None: - return parse_expr_list(new_expr_list) - - # Right-to-left parsing. - # Step one position left. - idx -= 1 - - # No match found. - raise MiniscriptMalformed(f"{expr_list}") - - -def miniscript_from_script(script, pkh_preimages={}): - """Construct miniscript node from script. - - :param script: The Bitcoin Script to decode. - :param pkh_preimage: A mapping from keyhash to key to decode pk_h() fragments. - """ - expr_list = decompose_script(script) - expr_list_len = len(expr_list) - - # We first parse terminal expressions. - idx = 0 - while idx < expr_list_len: - parse_term_single_elem(expr_list, idx) - - if expr_list_len - idx >= 2: - new_expr_list = parse_term_2_elems(expr_list, idx) - if new_expr_list is not None: - expr_list = new_expr_list - expr_list_len = len(expr_list) - - if expr_list_len - idx >= 5: - new_expr_list = parse_term_5_elems(expr_list, idx, pkh_preimages) - if new_expr_list is not None: - expr_list = new_expr_list - expr_list_len = len(expr_list) - - if expr_list_len - idx >= 7: - new_expr_list = parse_term_7_elems(expr_list, idx) - if new_expr_list is not None: - expr_list = new_expr_list - expr_list_len = len(expr_list) - - idx += 1 - - # fragments.And then recursively parse non-terminal ones. - return parse_expr_list(expr_list) - - -def split_params(string): - """Read a list of values before the next ')'. Split the result by comma.""" - i = string.find(")") - assert i >= 0 - - params, remaining = string[:i], string[i:] - if len(remaining) > 0: - return params.split(","), remaining[1:] - else: - return params.split(","), "" - - -def parse_many(string): - """Read a list of nodes before the next ')'.""" - subs = [] - remaining = string - while True: - sub, remaining = parse_one(remaining) - subs.append(sub) - if remaining[0] == ")": - return subs, remaining[1:] - assert remaining[0] == "," # TODO: real errors - remaining = remaining[1:] - - -def parse_one_num(string): - """Read an integer before the next comma.""" - i = string.find(",") - assert i >= 0 - - return int(string[:i]), string[i + 1:] - - -def parse_one(string): - """Read a node and its subs recursively from a string. - Returns the node and the part of the string not consumed. - """ - - # We special case fragments.Just1 and fragments.Just0 since they are the only one which don't - # have a function syntax. - if string[0] == "0": - return fragments.Just0(), string[1:] - if string[0] == "1": - return fragments.Just1(), string[1:] - - # Now, find the separator for all functions. - for i, char in enumerate(string): - if char in ["(", ":"]: - break - # For wrappers, we may have many of them. - if char == ":" and i > 1: - tag, remaining = string[0], string[1:] - else: - tag, remaining = string[:i], string[i + 1:] - - # fragments.Wrappers - if char == ":": - sub, remaining = parse_one(remaining) - if tag == "a": - return fragments.WrapA(sub), remaining - - if tag == "s": - return fragments.WrapS(sub), remaining - - if tag == "c": - return fragments.WrapC(sub), remaining - - if tag == "t": - return fragments.WrapT(sub), remaining - - if tag == "d": - return fragments.WrapD(sub), remaining - - if tag == "v": - return fragments.WrapV(sub), remaining - - if tag == "j": - return fragments.WrapJ(sub), remaining - - if tag == "n": - return fragments.WrapN(sub), remaining - - if tag == "l": - return fragments.WrapL(sub), remaining - - if tag == "u": - return fragments.WrapU(sub), remaining - - assert False, (tag, sub, remaining) # TODO: real errors - - # Terminal elements other than 0 and 1 - if tag in [ - "pk", - "pkh", - "pk_k", - "pk_h", - "sha256", - "hash256", - "ripemd160", - "hash160", - "older", - "after", - "multi", - ]: - params, remaining = split_params(remaining) - - if tag == "0": - return fragments.Just0(), remaining - - if tag == "1": - return fragments.Just1(), remaining - - if tag == "pk": - return fragments.WrapC(fragments.Pk(params[0])), remaining - - if tag == "pk_k": - return fragments.Pk(params[0]), remaining - - if tag == "pkh": - return fragments.WrapC(fragments.Pkh(params[0])), remaining - - if tag == "pk_h": - return fragments.Pkh(params[0]), remaining - - if tag == "older": - value = int(params[0]) - return fragments.Older(value), remaining - - if tag == "after": - value = int(params[0]) - return fragments.After(value), remaining - - if tag in ["sha256", "hash256", "ripemd160", "hash160"]: - digest = bytes.fromhex(params[0]) - if tag == "sha256": - return fragments.Sha256(digest), remaining - if tag == "hash256": - return fragments.Hash256(digest), remaining - if tag == "ripemd160": - return fragments.Ripemd160(digest), remaining - return fragments.Hash160(digest), remaining - - if tag == "multi": - k = int(params.pop(0)) - key_n = [] - for param in params: - key_obj = DescriptorKey(param) - key_n.append(key_obj) - return fragments.Multi(k, key_n), remaining - - assert False, (tag, params, remaining) - - # Non-terminal elements (connectives) - # We special case fragments.Thresh, as its first sub is an integer. - if tag == "thresh": - k, remaining = parse_one_num(remaining) - # TODO: real errors in place of unpacking - subs, remaining = parse_many(remaining) - - if tag == "and_v": - return fragments.AndV(*subs), remaining - - if tag == "and_b": - return fragments.AndB(*subs), remaining - - if tag == "and_n": - return fragments.AndN(*subs), remaining - - if tag == "or_b": - return fragments.OrB(*subs), remaining - - if tag == "or_c": - return fragments.OrC(*subs), remaining - - if tag == "or_d": - return fragments.OrD(*subs), remaining - - if tag == "or_i": - return fragments.OrI(*subs), remaining - - if tag == "andor": - return fragments.AndOr(*subs), remaining - - if tag == "thresh": - return fragments.Thresh(k, subs), remaining - - assert False, (tag, subs, remaining) # TODO - - -def miniscript_from_str(ms_str): - """Construct miniscript node from string representation""" - node, remaining = parse_one(ms_str) - assert remaining == "" - return node diff --git a/bitcoin_client/ledger_bitcoin/bip380/miniscript/property.py b/bitcoin_client/ledger_bitcoin/bip380/miniscript/property.py deleted file mode 100644 index 5cff50b79..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/miniscript/property.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (c) 2020 The Bitcoin Core developers -# Copyright (c) 2021 Antoine Poinsot -# Distributed under the MIT software license, see the accompanying -# file LICENSE or http://www.opensource.org/licenses/mit-license.php. - -from .errors import MiniscriptPropertyError - - -# TODO: implement __eq__ -class Property: - """Miniscript expression property""" - - # "B": Base type - # "V": Verify type - # "K": Key type - # "W": Wrapped type - # "z": Zero-arg property - # "o": One-arg property - # "n": Nonzero arg property - # "d": Dissatisfiable property - # "u": Unit property - types = "BVKW" - props = "zondu" - - def __init__(self, property_str=""): - """Create a property, optionally from a str of property and types""" - allowed = self.types + self.props - invalid = set(property_str).difference(set(allowed)) - - if invalid: - raise MiniscriptPropertyError( - f"Invalid property/type character(s) '{''.join(invalid)}'" - f" (allowed: '{allowed}')" - ) - - for literal in allowed: - setattr(self, literal, literal in property_str) - - self.check_valid() - - def __repr__(self): - """Generate string representation of property""" - return "".join([c for c in self.types + self.props if getattr(self, c)]) - - def has_all(self, properties): - """Given a str of types and properties, return whether we have all of them""" - return all([getattr(self, pt) for pt in properties]) - - def has_any(self, properties): - """Given a str of types and properties, return whether we have at least one of them""" - return any([getattr(self, pt) for pt in properties]) - - def check_valid(self): - """Raises a MiniscriptPropertyError if the types/properties conflict""" - # Can only be of a single type. - if len(self.type()) > 1: - raise MiniscriptPropertyError(f"A Miniscript fragment can only be of a single type, got '{self.type()}'") - - # Check for conflicts in type & properties. - checks = [ - # (type/property, must_be, must_not_be) - ("K", "u", ""), - ("V", "", "du"), - ("z", "", "o"), - ("n", "", "z"), - ] - conflicts = [] - - for (attr, must_be, must_not_be) in checks: - if not getattr(self, attr): - continue - if not self.has_all(must_be): - conflicts.append(f"{attr} must be {must_be}") - if self.has_any(must_not_be): - conflicts.append(f"{attr} must not be {must_not_be}") - if conflicts: - raise MiniscriptPropertyError(f"Conflicting types and properties: {', '.join(conflicts)}") - - def type(self): - return "".join(filter(lambda x: x in self.types, str(self))) - - def properties(self): - return "".join(filter(lambda x: x in self.props, str(self))) diff --git a/bitcoin_client/ledger_bitcoin/bip380/miniscript/satisfaction.py b/bitcoin_client/ledger_bitcoin/bip380/miniscript/satisfaction.py deleted file mode 100644 index 67e878060..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/miniscript/satisfaction.py +++ /dev/null @@ -1,409 +0,0 @@ -""" -Miniscript satisfaction. - -This module contains logic for "signing for" a Miniscript (constructing a valid witness -that meets the conditions set by the Script) and analysis of such satisfaction(s) (eg the -maximum cost in a given resource). -This is currently focused on non-malleable satisfaction. We take shortcuts to not care about -non-canonical (dis)satisfactions. -""" - - -def add_optional(a, b): - """Add two numbers that may be None together.""" - if a is None or b is None: - return None - return a + b - - -def max_optional(a, b): - """Return the maximum of two numbers that may be None.""" - if a is None: - return b - if b is None: - return a - return max(a, b) - - -class SatisfactionMaterial: - """Data that may be needed in order to satisfy a Minsicript fragment.""" - - def __init__( - self, preimages={}, signatures={}, max_sequence=2 ** 32, max_lock_time=2 ** 32 - ): - """ - :param preimages: Mapping from a hash (as bytes), to its 32-bytes preimage. - :param signatures: Mapping from a public key (as bytes), to a signature for this key. - :param max_sequence: The maximum relative timelock possible (coin age). - :param max_lock_time: The maximum absolute timelock possible (block height). - """ - self.preimages = preimages - self.signatures = signatures - self.max_sequence = max_sequence - self.max_lock_time = max_lock_time - - def clear(self): - self.preimages.clear() - self.signatures.clear() - self.max_sequence = 0 - self.max_lock_time = 0 - - def __repr__(self): - return ( - f"SatisfactionMaterial(preimages: {self.preimages}, signatures: " - f"{self.signatures}, max_sequence: {self.max_sequence}, max_lock_time: " - f"{self.max_lock_time}" - ) - - -class Satisfaction: - """All information about a satisfaction.""" - - def __init__(self, witness, has_sig=False): - assert isinstance(witness, list) or witness is None - self.witness = witness - self.has_sig = has_sig - # TODO: we probably need to take into account non-canon sats, as the algorithm - # described on the website mandates it: - # > Iterate over all the valid satisfactions/dissatisfactions in the table above - # > (including the non-canonical ones), - - def __add__(self, other): - """Concatenate two satisfactions together.""" - witness = add_optional(self.witness, other.witness) - has_sig = self.has_sig or other.has_sig - return Satisfaction(witness, has_sig) - - def __or__(self, other): - """Choose between two (dis)satisfactions.""" - assert isinstance(other, Satisfaction) - - # If one isn't available, return the other one. - if self.witness is None: - return other - if other.witness is None: - return self - - # > If among all valid solutions (including DONTUSE ones) more than one does not - # > have the HASSIG marker, return DONTUSE, as this is malleable because of reason - # > 1. - # TODO - # if not (self.has_sig or other.has_sig): - # return Satisfaction.unavailable() - - # > If instead exactly one does not have the HASSIG marker, return that solution - # > because of reason 2. - if self.has_sig and not other.has_sig: - return other - if not self.has_sig and other.has_sig: - return self - - # > Otherwise, all not-DONTUSE options are valid, so return the smallest one (in - # > terms of witness size). - if self.size() > other.size(): - return other - - # > If all valid solutions have the HASSIG marker, but all of them are DONTUSE, return DONTUSE-HASSIG. - # TODO - - return self - - def unavailable(): - return Satisfaction(witness=None) - - def is_unavailable(self): - return self.witness is None - - def size(self): - return len(self.witness) + sum(len(elem) for elem in self.witness) - - def from_concat(sat_material, sub_a, sub_b, disjunction=False): - """Get the satisfaction for a Miniscript whose Script corresponds to a - concatenation of two subscripts A and B. - - :param sub_a: The sub-fragment A. - :param sub_b: The sub-fragment B. - :param disjunction: Whether this fragment has an 'or()' semantic. - """ - if disjunction: - return (sub_b.dissatisfaction() + sub_a.satisfaction(sat_material)) | ( - sub_b.satisfaction(sat_material) + sub_a.dissatisfaction() - ) - return sub_b.satisfaction(sat_material) + sub_a.satisfaction(sat_material) - - def from_or_uneven(sat_material, sub_a, sub_b): - """Get the satisfaction for a Miniscript which unconditionally executes a first - sub A and only executes B if A was dissatisfied. - - :param sub_a: The sub-fragment A. - :param sub_b: The sub-fragment B. - """ - return sub_a.satisfaction(sat_material) | ( - sub_b.satisfaction(sat_material) + sub_a.dissatisfaction() - ) - - def from_thresh(sat_material, k, subs): - """Get the satisfaction for a Miniscript which satisfies k of the given subs, - and dissatisfies all the others. - - :param sat_material: The material to satisfy the challenges. - :param k: The number of subs that need to be satisfied. - :param subs: The list of all subs of the threshold. - """ - # Pick the k sub-fragments to satisfy, prefering (in order): - # 1. Fragments that don't require a signature to be satisfied - # 2. Fragments whose satisfaction's size is smaller - # Record the unavailable (in either way) ones as we go. - arbitrage, unsatisfiable, undissatisfiable = [], [], [] - for sub in subs: - sat, dissat = sub.satisfaction(sat_material), sub.dissatisfaction() - if sat.witness is None: - unsatisfiable.append(sub) - elif dissat.witness is None: - undissatisfiable.append(sub) - else: - arbitrage.append( - (int(sat.has_sig), len(sat.witness) - len(dissat.witness), sub) - ) - - # If not enough (dis)satisfactions are available, fail. - if len(unsatisfiable) > len(subs) - k or len(undissatisfiable) > k: - return Satisfaction.unavailable() - - # Otherwise, satisfy the k most optimal ones. - arbitrage = sorted(arbitrage, key=lambda x: x[:2]) - optimal_sat = undissatisfiable + [a[2] for a in arbitrage] + unsatisfiable - to_satisfy = set(optimal_sat[:k]) - return sum( - [ - sub.satisfaction(sat_material) - if sub in to_satisfy - else sub.dissatisfaction() - for sub in subs[::-1] - ], - start=Satisfaction(witness=[]), - ) - - -class ExecutionInfo: - """Information about the execution of a Miniscript.""" - - def __init__(self, stat_ops, _dyn_ops, sat_size, dissat_size): - # The *maximum* number of *always* executed non-PUSH Script OPs to satisfy this - # Miniscript fragment non-malleably. - self._static_ops_count = stat_ops - # The maximum possible number of counted-as-executed-by-interpreter OPs if this - # fragment is executed. - # It is only >0 for an executed multi() branch. That is, for a CHECKMULTISIG that - # is not part of an unexecuted branch of an IF .. ENDIF. - self._dyn_ops_count = _dyn_ops - # The *maximum* number of stack elements to satisfy this Miniscript fragment - # non-malleably. - self.sat_elems = sat_size - # The *maximum* number of stack elements to dissatisfy this Miniscript fragment - # non-malleably. - self.dissat_elems = dissat_size - - @property - def ops_count(self): - """ - The worst-case number of OPs that would be considered executed by the Script - interpreter. - Note it is considered alone and not necessarily coherent with the other maxima. - """ - return self._static_ops_count + self._dyn_ops_count - - def is_dissatisfiable(self): - """Whether the Miniscript is *non-malleably* dissatisfiable.""" - return self.dissat_elems is not None - - def set_undissatisfiable(self): - """Set the Miniscript as being impossible to dissatisfy.""" - self.dissat_elems = None - - def from_concat(sub_a, sub_b, ops_count=0, disjunction=False): - """Compute the execution info from a Miniscript whose Script corresponds to - a concatenation of two subscript A and B. - - :param sub_a: The execution information of the subscript A. - :param sub_b: The execution information of the subscript B. - :param ops_count: The added number of static OPs added on top. - :param disjunction: Whether this fragment has an 'or()' semantic. - """ - # Number of static OPs is simple, they are all executed. - static_ops = sub_a._static_ops_count + sub_b._static_ops_count + ops_count - # Same for the dynamic ones, there is no conditional branch here. - dyn_ops = sub_a._dyn_ops_count + sub_b._dyn_ops_count - # If this is an 'or', only one needs to be satisfied. Pick the most expensive - # satisfaction/dissatisfaction pair. - # If not, both need to be anyways. - if disjunction: - first = add_optional(sub_a.sat_elems, sub_b.dissat_elems) - second = add_optional(sub_a.dissat_elems, sub_b.sat_elems) - sat_elems = max_optional(first, second) - else: - sat_elems = add_optional(sub_a.sat_elems, sub_b.sat_elems) - # In any case dissatisfying the fragment requires dissatisfying both concatenated - # subs. - dissat_elems = add_optional(sub_a.dissat_elems, sub_b.dissat_elems) - - return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - - def from_or_uneven(sub_a, sub_b, ops_count=0): - """Compute the execution info from a Miniscript which always executes A and only - executes B depending on the outcome of A's execution. - - :param sub_a: The execution information of the subscript A. - :param sub_b: The execution information of the subscript B. - :param ops_count: The added number of static OPs added on top. - """ - # Number of static OPs is simple, they are all executed. - static_ops = sub_a._static_ops_count + sub_b._static_ops_count + ops_count - # If the first sub is non-malleably dissatisfiable, the worst case is executing - # both. Otherwise it is necessarily satisfying only the first one. - if sub_a.is_dissatisfiable(): - dyn_ops = sub_a._dyn_ops_count + sub_b._dyn_ops_count - else: - dyn_ops = sub_a._dyn_ops_count - # Either we satisfy A, or satisfy B (and thereby dissatisfy A). Pick the most - # expensive. - first = sub_a.sat_elems - second = add_optional(sub_a.dissat_elems, sub_b.sat_elems) - sat_elems = max_optional(first, second) - # We only take canonical dissatisfactions into account. - dissat_elems = add_optional(sub_a.dissat_elems, sub_b.dissat_elems) - - return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - - def from_or_even(sub_a, sub_b, ops_count): - """Compute the execution info from a Miniscript which executes either A or B, but - never both. - - :param sub_a: The execution information of the subscript A. - :param sub_b: The execution information of the subscript B. - :param ops_count: The added number of static OPs added on top. - """ - # Number of static OPs is simple, they are all executed. - static_ops = sub_a._static_ops_count + sub_b._static_ops_count + ops_count - # Only one of the branch is executed, pick the most expensive one. - dyn_ops = max(sub_a._dyn_ops_count, sub_b._dyn_ops_count) - # Same. Also, we add a stack element used to tell which branch to take. - sat_elems = add_optional(max_optional(sub_a.sat_elems, sub_b.sat_elems), 1) - # Same here. - dissat_elems = add_optional( - max_optional(sub_a.dissat_elems, sub_b.dissat_elems), 1 - ) - - return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - - def from_andor_uneven(sub_a, sub_b, sub_c, ops_count=0): - """Compute the execution info from a Miniscript which always executes A, and then - executes B if A returned True else executes C. Semantic: or(and(A,B), C). - - :param sub_a: The execution information of the subscript A. - :param sub_b: The execution information of the subscript B. - :param sub_b: The execution information of the subscript C. - :param ops_count: The added number of static OPs added on top. - """ - # Number of static OPs is simple, they are all executed. - static_ops = ( - sum(sub._static_ops_count for sub in [sub_a, sub_b, sub_c]) + ops_count - ) - # If the first sub is non-malleably dissatisfiable, the worst case is executing - # it and the most expensive between B and C. - # If it isn't the worst case is then necessarily to execute A and B. - if sub_a.is_dissatisfiable(): - dyn_ops = sub_a._dyn_ops_count + max( - sub_b._dyn_ops_count, sub_c._dyn_ops_count - ) - else: - # If the first isn't non-malleably dissatisfiable, the worst case is - # satisfying it (and necessarily satisfying the second one too) - dyn_ops = sub_a._dyn_ops_count + sub_b._dyn_ops_count - # Same for the number of stack elements (implicit from None here). - first = add_optional(sub_a.sat_elems, sub_b.sat_elems) - second = add_optional(sub_a.dissat_elems, sub_c.sat_elems) - sat_elems = max_optional(first, second) - # The only canonical dissatisfaction is dissatisfying A and C. - dissat_elems = add_optional(sub_a.dissat_elems, sub_c.dissat_elems) - - return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - - # TODO: i think it'd be possible to not have this be special-cased to 'thresh()' - def from_thresh(k, subs): - """Compute the execution info from a Miniscript 'thresh()' fragment. Specialized - to this specifc fragment for now. - - :param k: The actual threshold of the 'thresh()' fragment. - :param subs: All the possible sub scripts. - """ - # All the OPs from the subs + n-1 * OP_ADD + 1 * OP_EQUAL - static_ops = sum(sub._static_ops_count for sub in subs) + len(subs) - # dyn_ops = sum(sorted([sub._dyn_ops_count for sub in subs], reverse=True)[:k]) - # All subs are executed, there is no OP_IF branch. - dyn_ops = sum([sub._dyn_ops_count for sub in subs]) - - # In order to estimate the worst case we simulate to satisfy the k subs whose - # sat/dissat ratio is the largest, and dissatisfy the others. - # We do so by iterating through all the subs, recording their sat-dissat "score" - # and those that either cannot be satisfied or dissatisfied. - arbitrage, unsatisfiable, undissatisfiable = [], [], [] - for sub in subs: - if sub.sat_elems is None: - unsatisfiable.append(sub) - elif sub.dissat_elems is None: - undissatisfiable.append(sub) - else: - arbitrage.append((sub.sat_elems - sub.dissat_elems, sub)) - # Of course, if too many can't be (dis)satisfied, we have a problem. - # Otherwise, simulate satisfying first the subs that must be (no dissatisfaction) - # then the most expensive ones, and then dissatisfy all the others. - if len(unsatisfiable) > len(subs) - k or len(undissatisfiable) > k: - sat_elems = None - else: - arbitrage = sorted(arbitrage, key=lambda x: x[0], reverse=True) - worst_sat = undissatisfiable + [a[1] for a in arbitrage] + unsatisfiable - sat_elems = sum( - [sub.sat_elems for sub in worst_sat[:k]] - + [sub.dissat_elems for sub in worst_sat[k:]] - ) - if len(undissatisfiable) > 0: - dissat_elems = None - else: - dissat_elems = sum([sub.dissat_elems for sub in subs]) - - return ExecutionInfo(static_ops, dyn_ops, sat_elems, dissat_elems) - - def from_wrap(sub, ops_count, dyn=0, sat=0, dissat=0): - """Compute the execution info from a Miniscript which always executes a subscript - but adds some logic around. - - :param sub: The execution information of the single subscript. - :param ops_count: The added number of static OPs added on top. - :param dyn: The added number of dynamic OPs added on top. - :param sat: The added number of satisfaction stack elements added on top. - :param dissat: The added number of dissatisfcation stack elements added on top. - """ - return ExecutionInfo( - sub._static_ops_count + ops_count, - sub._dyn_ops_count + dyn, - add_optional(sub.sat_elems, sat), - add_optional(sub.dissat_elems, dissat), - ) - - def from_wrap_dissat(sub, ops_count, dyn=0, sat=0, dissat=0): - """Compute the execution info from a Miniscript which always executes a subscript - but adds some logic around. - - :param sub: The execution information of the single subscript. - :param ops_count: The added number of static OPs added on top. - :param dyn: The added number of dynamic OPs added on top. - :param sat: The added number of satisfaction stack elements added on top. - :param dissat: The added number of dissatisfcation stack elements added on top. - """ - return ExecutionInfo( - sub._static_ops_count + ops_count, - sub._dyn_ops_count + dyn, - add_optional(sub.sat_elems, sat), - dissat, - ) diff --git a/bitcoin_client/ledger_bitcoin/bip380/utils/__init__.py b/bitcoin_client/ledger_bitcoin/bip380/utils/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/bitcoin_client/ledger_bitcoin/bip380/utils/bignum.py b/bitcoin_client/ledger_bitcoin/bip380/utils/bignum.py deleted file mode 100644 index 138493918..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/utils/bignum.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright (c) 2015-2020 The Bitcoin Core developers -# Copyright (c) 2021 Antoine Poinsot -# Distributed under the MIT software license, see the accompanying -# file LICENSE or http://www.opensource.org/licenses/mit-license.php. -"""Big number routines. - -This file is taken from the Bitcoin Core test framework. It was previously -copied from python-bitcoinlib. -""" - -import struct - - -# generic big endian MPI format - - -def bn_bytes(v, have_ext=False): - ext = 0 - if have_ext: - ext = 1 - return ((v.bit_length() + 7) // 8) + ext - - -def bn2bin(v): - s = bytearray() - i = bn_bytes(v) - while i > 0: - s.append((v >> ((i - 1) * 8)) & 0xFF) - i -= 1 - return s - - -def bn2mpi(v): - have_ext = False - if v.bit_length() > 0: - have_ext = (v.bit_length() & 0x07) == 0 - - neg = False - if v < 0: - neg = True - v = -v - - s = struct.pack(b">I", bn_bytes(v, have_ext)) - ext = bytearray() - if have_ext: - ext.append(0) - v_bin = bn2bin(v) - if neg: - if have_ext: - ext[0] |= 0x80 - else: - v_bin[0] |= 0x80 - return s + ext + v_bin - - -# bitcoin-specific little endian format, with implicit size -def mpi2vch(s): - r = s[4:] # strip size - r = r[::-1] # reverse string, converting BE->LE - return r - - -def bn2vch(v): - return bytes(mpi2vch(bn2mpi(v))) diff --git a/bitcoin_client/ledger_bitcoin/bip380/utils/hashes.py b/bitcoin_client/ledger_bitcoin/bip380/utils/hashes.py deleted file mode 100644 index 1124dc57a..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/utils/hashes.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -Common Bitcoin hashes. -""" - -import hashlib -from .ripemd_fallback import ripemd160_fallback - - -def sha256(data): - """{data} must be bytes, returns sha256(data)""" - assert isinstance(data, bytes) - return hashlib.sha256(data).digest() - - -def hash160(data): - """{data} must be bytes, returns ripemd160(sha256(data))""" - assert isinstance(data, bytes) - if 'ripemd160' in hashlib.algorithms_available: - return hashlib.new("ripemd160", sha256(data)).digest() - return ripemd160_fallback(sha256(data)) diff --git a/bitcoin_client/ledger_bitcoin/bip380/utils/ripemd_fallback.py b/bitcoin_client/ledger_bitcoin/bip380/utils/ripemd_fallback.py deleted file mode 100644 index a4043de9b..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/utils/ripemd_fallback.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright (c) 2021 Pieter Wuille -# Distributed under the MIT software license, see the accompanying -# file COPYING or http://www.opensource.org/licenses/mit-license.php. -# -# Taken from https://github.com/bitcoin/bitcoin/blob/124e75a41ea0f3f0e90b63b0c41813184ddce2ab/test/functional/test_framework/ripemd160.py - -# fmt: off - -""" -Pure Python RIPEMD160 implementation. - -WARNING: This implementation is NOT constant-time. -Do not use without understanding the implications. -""" - -# Message schedule indexes for the left path. -ML = [ - 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, - 7, 4, 13, 1, 10, 6, 15, 3, 12, 0, 9, 5, 2, 14, 11, 8, - 3, 10, 14, 4, 9, 15, 8, 1, 2, 7, 0, 6, 13, 11, 5, 12, - 1, 9, 11, 10, 0, 8, 12, 4, 13, 3, 7, 15, 14, 5, 6, 2, - 4, 0, 5, 9, 7, 12, 2, 10, 14, 1, 3, 8, 11, 6, 15, 13 -] - -# Message schedule indexes for the right path. -MR = [ - 5, 14, 7, 0, 9, 2, 11, 4, 13, 6, 15, 8, 1, 10, 3, 12, - 6, 11, 3, 7, 0, 13, 5, 10, 14, 15, 8, 12, 4, 9, 1, 2, - 15, 5, 1, 3, 7, 14, 6, 9, 11, 8, 12, 2, 10, 0, 4, 13, - 8, 6, 4, 1, 3, 11, 15, 0, 5, 12, 2, 13, 9, 7, 10, 14, - 12, 15, 10, 4, 1, 5, 8, 7, 6, 2, 13, 14, 0, 3, 9, 11 -] - -# Rotation counts for the left path. -RL = [ - 11, 14, 15, 12, 5, 8, 7, 9, 11, 13, 14, 15, 6, 7, 9, 8, - 7, 6, 8, 13, 11, 9, 7, 15, 7, 12, 15, 9, 11, 7, 13, 12, - 11, 13, 6, 7, 14, 9, 13, 15, 14, 8, 13, 6, 5, 12, 7, 5, - 11, 12, 14, 15, 14, 15, 9, 8, 9, 14, 5, 6, 8, 6, 5, 12, - 9, 15, 5, 11, 6, 8, 13, 12, 5, 12, 13, 14, 11, 8, 5, 6 -] - -# Rotation counts for the right path. -RR = [ - 8, 9, 9, 11, 13, 15, 15, 5, 7, 7, 8, 11, 14, 14, 12, 6, - 9, 13, 15, 7, 12, 8, 9, 11, 7, 7, 12, 7, 6, 15, 13, 11, - 9, 7, 15, 11, 8, 6, 6, 14, 12, 13, 5, 14, 13, 13, 7, 5, - 15, 5, 8, 11, 14, 14, 6, 14, 6, 9, 12, 9, 12, 5, 15, 8, - 8, 5, 12, 9, 12, 5, 14, 6, 8, 13, 6, 5, 15, 13, 11, 11 -] - -# K constants for the left path. -KL = [0, 0x5a827999, 0x6ed9eba1, 0x8f1bbcdc, 0xa953fd4e] - -# K constants for the right path. -KR = [0x50a28be6, 0x5c4dd124, 0x6d703ef3, 0x7a6d76e9, 0] - - -def fi(x, y, z, i): - """The f1, f2, f3, f4, and f5 functions from the specification.""" - if i == 0: - return x ^ y ^ z - elif i == 1: - return (x & y) | (~x & z) - elif i == 2: - return (x | ~y) ^ z - elif i == 3: - return (x & z) | (y & ~z) - elif i == 4: - return x ^ (y | ~z) - else: - assert False - - -def rol(x, i): - """Rotate the bottom 32 bits of x left by i bits.""" - return ((x << i) | ((x & 0xffffffff) >> (32 - i))) & 0xffffffff - - -def compress(h0, h1, h2, h3, h4, block): - """Compress state (h0, h1, h2, h3, h4) with block.""" - # Left path variables. - al, bl, cl, dl, el = h0, h1, h2, h3, h4 - # Right path variables. - ar, br, cr, dr, er = h0, h1, h2, h3, h4 - # Message variables. - x = [int.from_bytes(block[4*i:4*(i+1)], 'little') for i in range(16)] - - # Iterate over the 80 rounds of the compression. - for j in range(80): - rnd = j >> 4 - # Perform left side of the transformation. - al = rol(al + fi(bl, cl, dl, rnd) + x[ML[j]] + KL[rnd], RL[j]) + el - al, bl, cl, dl, el = el, al, bl, rol(cl, 10), dl - # Perform right side of the transformation. - ar = rol(ar + fi(br, cr, dr, 4 - rnd) + x[MR[j]] + KR[rnd], RR[j]) + er - ar, br, cr, dr, er = er, ar, br, rol(cr, 10), dr - - # Compose old state, left transform, and right transform into new state. - return h1 + cl + dr, h2 + dl + er, h3 + el + ar, h4 + al + br, h0 + bl + cr - - -def ripemd160_fallback(data): - """Compute the RIPEMD-160 hash of data.""" - # Initialize state. - state = (0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0) - # Process full 64-byte blocks in the input. - for b in range(len(data) >> 6): - state = compress(*state, data[64*b:64*(b+1)]) - # Construct final blocks (with padding and size). - pad = b"\x80" + b"\x00" * ((119 - len(data)) & 63) - fin = data[len(data) & ~63:] + pad + (8 * len(data)).to_bytes(8, 'little') - # Process final blocks. - for b in range(len(fin) >> 6): - state = compress(*state, fin[64*b:64*(b+1)]) - # Produce output. - return b"".join((h & 0xffffffff).to_bytes(4, 'little') for h in state) \ No newline at end of file diff --git a/bitcoin_client/ledger_bitcoin/bip380/utils/script.py b/bitcoin_client/ledger_bitcoin/bip380/utils/script.py deleted file mode 100644 index 9ff0e703d..000000000 --- a/bitcoin_client/ledger_bitcoin/bip380/utils/script.py +++ /dev/null @@ -1,473 +0,0 @@ -# Copyright (c) 2015-2020 The Bitcoin Core developers -# Copyright (c) 2021 Antoine Poinsot -# Distributed under the MIT software license, see the accompanying -# file LICENSE or http://www.opensource.org/licenses/mit-license.php. -"""Script utilities - -This file was taken from Bitcoin Core test framework, and was previously -modified from python-bitcoinlib. -""" -import struct - -from .bignum import bn2vch - - -OPCODE_NAMES = {} - - -class CScriptOp(int): - """A single script opcode""" - - __slots__ = () - - @staticmethod - def encode_op_pushdata(d): - """Encode a PUSHDATA op, returning bytes""" - if len(d) < 0x4C: - return b"" + bytes([len(d)]) + d # OP_PUSHDATA - elif len(d) <= 0xFF: - return b"\x4c" + bytes([len(d)]) + d # OP_PUSHDATA1 - elif len(d) <= 0xFFFF: - return b"\x4d" + struct.pack(b" 4: - raise ScriptNumError("Too large push") - - if size == 0: - return 0 - - # We always check for minimal encoding - if (data[size - 1] & 0x7f) == 0: - if size == 1 or (data[size - 2] & 0x80) == 0: - raise ScriptNumError("Non minimal encoding") - - res = int.from_bytes(data, byteorder="little") - - # Remove the sign bit if set, and negate the result - if data[size - 1] & 0x80: - return -(res & ~(0x80 << (size - 1))) - return res - - -class CScriptInvalidError(Exception): - """Base class for CScript exceptions""" - - pass - - -class CScriptTruncatedPushDataError(CScriptInvalidError): - """Invalid pushdata due to truncation""" - - def __init__(self, msg, data): - self.data = data - super(CScriptTruncatedPushDataError, self).__init__(msg) - - -# This is used, eg, for blockchain heights in coinbase scripts (bip34) -class CScriptNum: - __slots__ = ("value",) - - def __init__(self, d=0): - self.value = d - - @staticmethod - def encode(obj): - r = bytearray(0) - if obj.value == 0: - return bytes(r) - neg = obj.value < 0 - absvalue = -obj.value if neg else obj.value - while absvalue: - r.append(absvalue & 0xFF) - absvalue >>= 8 - if r[-1] & 0x80: - r.append(0x80 if neg else 0) - elif neg: - r[-1] |= 0x80 - return bytes([len(r)]) + r - - @staticmethod - def decode(vch): - result = 0 - # We assume valid push_size and minimal encoding - value = vch[1:] - if len(value) == 0: - return result - for i, byte in enumerate(value): - result |= int(byte) << 8 * i - if value[-1] >= 0x80: - # Mask for all but the highest result bit - num_mask = (2 ** (len(value) * 8) - 1) >> 1 - result &= num_mask - result *= -1 - return result - - -class CScript(bytes): - """Serialized script - - A bytes subclass, so you can use this directly whenever bytes are accepted. - Note that this means that indexing does *not* work - you'll get an index by - byte rather than opcode. This format was chosen for efficiency so that the - general case would not require creating a lot of little CScriptOP objects. - - iter(script) however does iterate by opcode. - """ - - __slots__ = () - - @classmethod - def __coerce_instance(cls, other): - # Coerce other into bytes - if isinstance(other, CScriptOp): - other = bytes([other]) - elif isinstance(other, CScriptNum): - if other.value == 0: - other = bytes([CScriptOp(OP_0)]) - else: - other = CScriptNum.encode(other) - elif isinstance(other, int): - if 0 <= other <= 16: - other = bytes([CScriptOp.encode_op_n(other)]) - elif other == -1: - other = bytes([OP_1NEGATE]) - else: - other = CScriptOp.encode_op_pushdata(bn2vch(other)) - elif isinstance(other, (bytes, bytearray)): - other = CScriptOp.encode_op_pushdata(other) - return other - - def __add__(self, other): - # Do the coercion outside of the try block so that errors in it are - # noticed. - other = self.__coerce_instance(other) - - try: - # bytes.__add__ always returns bytes instances unfortunately - return CScript(super(CScript, self).__add__(other)) - except TypeError: - raise TypeError("Can not add a %r instance to a CScript" % other.__class__) - - def join(self, iterable): - # join makes no sense for a CScript() - raise NotImplementedError - - def __new__(cls, value=b""): - if isinstance(value, bytes) or isinstance(value, bytearray): - return super(CScript, cls).__new__(cls, value) - else: - - def coerce_iterable(iterable): - for instance in iterable: - yield cls.__coerce_instance(instance) - - # Annoyingly on both python2 and python3 bytes.join() always - # returns a bytes instance even when subclassed. - return super(CScript, cls).__new__(cls, b"".join(coerce_iterable(value))) - - def raw_iter(self): - """Raw iteration - - Yields tuples of (opcode, data, sop_idx) so that the different possible - PUSHDATA encodings can be accurately distinguished, as well as - determining the exact opcode byte indexes. (sop_idx) - """ - i = 0 - while i < len(self): - sop_idx = i - opcode = self[i] - i += 1 - - if opcode > OP_PUSHDATA4: - yield (opcode, None, sop_idx) - else: - datasize = None - pushdata_type = None - if opcode < OP_PUSHDATA1: - pushdata_type = "PUSHDATA(%d)" % opcode - datasize = opcode - - elif opcode == OP_PUSHDATA1: - pushdata_type = "PUSHDATA1" - if i >= len(self): - raise CScriptInvalidError("PUSHDATA1: missing data length") - datasize = self[i] - i += 1 - - elif opcode == OP_PUSHDATA2: - pushdata_type = "PUSHDATA2" - if i + 1 >= len(self): - raise CScriptInvalidError("PUSHDATA2: missing data length") - datasize = self[i] + (self[i + 1] << 8) - i += 2 - - elif opcode == OP_PUSHDATA4: - pushdata_type = "PUSHDATA4" - if i + 3 >= len(self): - raise CScriptInvalidError("PUSHDATA4: missing data length") - datasize = ( - self[i] - + (self[i + 1] << 8) - + (self[i + 2] << 16) - + (self[i + 3] << 24) - ) - i += 4 - - else: - assert False # shouldn't happen - - data = bytes(self[i: i + datasize]) - - # Check for truncation - if len(data) < datasize: - raise CScriptTruncatedPushDataError( - "%s: truncated data" % pushdata_type, data - ) - - i += datasize - - yield (opcode, data, sop_idx) - - def __iter__(self): - """'Cooked' iteration - - Returns either a CScriptOP instance, an integer, or bytes, as - appropriate. - - See raw_iter() if you need to distinguish the different possible - PUSHDATA encodings. - """ - for (opcode, data, sop_idx) in self.raw_iter(): - if data is not None: - yield data - else: - opcode = CScriptOp(opcode) - - if opcode.is_small_int(): - yield opcode.decode_op_n() - else: - yield CScriptOp(opcode) - - def __repr__(self): - def _repr(o): - if isinstance(o, bytes): - return "x('%s')" % o.hex() - else: - return repr(o) - - ops = [] - i = iter(self) - while True: - op = None - try: - op = _repr(next(i)) - except CScriptTruncatedPushDataError as err: - op = "%s..." % (_repr(err.data), err) - break - except CScriptInvalidError as err: - op = "" % err - break - except StopIteration: - break - finally: - if op is not None: - ops.append(op) - - return "CScript([%s])" % ", ".join(ops) - - def GetSigOpCount(self, fAccurate): - """Get the SigOp count. - - fAccurate - Accurately count CHECKMULTISIG, see BIP16 for details. - - Note that this is consensus-critical. - """ - n = 0 - lastOpcode = OP_INVALIDOPCODE - for (opcode, data, sop_idx) in self.raw_iter(): - if opcode in (OP_CHECKSIG, OP_CHECKSIGVERIFY): - n += 1 - elif opcode in (OP_CHECKMULTISIG, OP_CHECKMULTISIGVERIFY): - if fAccurate and (OP_1 <= lastOpcode <= OP_16): - n += opcode.decode_op_n() - else: - n += 20 - lastOpcode = opcode - return n