Source code for cryptnox_cli.wallet.btc

# -*- coding: utf-8 -*-
"""
A basic BTC wallet library
"""
import math
import json
import re
import urllib.parse
import requests
from enum import Enum
from typing import Union, List, Dict
from urllib.parse import urlparse

from cryptnox_sdk_py import Derivation
from tabulate import tabulate

from .validators import AnyValidator, EnumValidator, IntValidator

try:
    from lib import cryptos
    from lib.cryptos.wallet_utils import number_of_significant_digits
except ImportError:
    from ..lib import cryptos
    from ..lib.cryptos.wallet_utils import number_of_significant_digits


[docs] class BtcNetworks(Enum): """ Class defining possible Bitcoin networks """ MAINNET = "mainnet" TESTNET = "testnet" TESTNET4 = "testnet4"
[docs] class BlockCypherApi: """ BlockCypherApi """
[docs] def __init__(self, api_key, network): self.apikey = api_key self.url = "https://api.blockcypher.com/v1/btc/main/" if network.lower() == "testnet": self.url = "https://api.blockcypher.com/v1/btc/test3/" self.params = {'token': self.apikey} self.js_res = [] self.web_rsc = None
def _validate_endpoint(self, endpoint: str) -> str: """ Validate and sanitize endpoint to prevent URL manipulation attacks. :param endpoint: The endpoint path to validate :return: Validated endpoint :raises ValueError: If endpoint contains suspicious characters """ # Security: Prevent path traversal and URL manipulation if any(char in endpoint for char in ['..', '//', '@', ':', '?', '#']): # Allow single forward slashes but not suspicious patterns if not all(part.isalnum() or part in ['-', '_'] for part in endpoint.split('/')): raise ValueError("Invalid endpoint: contains suspicious characters") return endpoint
[docs] def get_data(self, endpoint: str, params: Dict = None, data: bytes = None) \ -> None: """ :rtype: None :param endpoint: str :param params: dict :param data: bytes """ # Security: Validate endpoint before using in URL construction endpoint = self._validate_endpoint(endpoint) params = params or {} parameters = dict(params) parameters.update(self.params) params_enc = urllib.parse.urlencode(parameters) try: # Construct full URL and validate it stays within expected domain full_url = self.url + endpoint + "?" + params_enc parsed = urlparse(full_url) if not parsed.hostname or 'blockcypher.com' not in parsed.hostname: raise ValueError("Invalid URL: must be blockcypher.com domain") # Use 'requests' (validated allow-listed https URL) instead of urllib, # which would also honour file:// and other local schemes. headers = {'User-Agent': 'Mozilla/5.0'} if data is None: response = requests.get(full_url, headers=headers, timeout=30) else: response = requests.post(full_url, headers=headers, data=data, timeout=30) response.raise_for_status() self.js_res = response.json() self.web_rsc = None except Exception as ex: print(ex) raise IOError("Error while processing request:\n%s" % ( self.url + endpoint + "?" + params_enc )) from ex
[docs] def check_api_resp(self) -> None: """ :rtype: None """ if 'error' in self.js_res: print(" !! ERROR :") raise Exception(self.js_res['error']) if 'errors' in self.js_res: print(" !! ERRORS :") raise Exception(self.js_res['errors'])
[docs] def get_utx_os(self, addr: str, n_conf) -> List: # n_conf 0 or 1 """ :param addr: str :param n_conf: int (0 or 1) :return: List """ self.get_data("addrs/" + addr, {'unspentOnly': 'true'}) # translate inputs from blockcypher to pybitcoinlib addr_utxos = self.get_key('txrefs') if n_conf == 0: addr_utxos.extend(self.get_key('unconfirmed_txrefs')) sel_utxos = [] for utxo in addr_utxos: sel_utxos.append({ 'value': utxo['value'], 'output': utxo['tx_hash'] + ":" + str(utxo['tx_output_n']) }) return sel_utxos
[docs] def push_tx(self, tx_hex: str) -> Dict: """ :param tx_hex: :return: str """ data_tx = json.dumps({'tx': tx_hex}).encode('ascii') self.get_data("txs/push", data=data_tx) self.check_api_resp() return self.get_key('tx/hash')
[docs] def get_key(self, key_char: str) -> Dict: """ :param key_char:str :return: Dict """ out = self.js_res path = key_char.split("/") for key in path: if key.isdigit(): key = int(key) try: out = out[key] except KeyError: out = [] return out
[docs] class BlkHubApi: """ BlkHubApi """
[docs] def __init__(self, network, api_key: str = "", endpoint: str = ""): network = network.lower() self.url = endpoint if endpoint else BlkHubApi.get_api(network) if not self.url.endswith("/"): self.url += "/" self.api_key = api_key self.js_res = [] self.web_rsc = None
[docs] @staticmethod def get_api(network: str) -> str: """ Get API url for given network :param str network: :return: API url :rtype: str """ if network.lower() == "mainnet": return "https://blkhub.net/api/" if network.lower() == "testnet": return "https://blockstream.info/testnet/api/" if network.lower() == "testnet4": return "https://newest-intensive-feather.btc-testnet4.quiknode.pro/" raise Exception("Unknown BC network name")
def _validate_endpoint(self, endpoint: str) -> str: """ Validate and sanitize endpoint to prevent URL manipulation attacks. :param endpoint: The endpoint path to validate :return: Validated endpoint :raises ValueError: If endpoint contains suspicious characters """ # Security: Prevent path traversal and URL manipulation if any(char in endpoint for char in ['..', '//', '@', ':', '?', '#']): # Allow single forward slashes but not suspicious patterns if not all(part.isalnum() or part in ['-', '_'] for part in endpoint.split('/')): raise ValueError("Invalid endpoint: contains suspicious characters") return endpoint
[docs] def get_data(self, endpoint: str, params: Dict = None, data: bytes = None) \ -> None: """ :param endpoint: str :param params: dict :param data: bytes :return: None """ # Security: Validate endpoint before using in URL construction endpoint = self._validate_endpoint(endpoint) params = params or {} parameters = dict(params) params_enc = urllib.parse.urlencode(parameters) try: # Construct full URL and validate it stays within expected domain full_url = self.url + endpoint + ("?" + params_enc if params_enc else "") parsed = urlparse(full_url) _ALLOWED_DOMAINS = ('blkhub.net', 'blockstream.info', 'mempool.space', 'quiknode.pro', 'quicknode.com') if not parsed.hostname or not any( parsed.hostname == d or parsed.hostname.endswith('.' + d) for d in _ALLOWED_DOMAINS): raise ValueError( f"Invalid URL: must be one of {', '.join(_ALLOWED_DOMAINS)}" ) headers = {'User-Agent': 'Mozilla/5.0'} if self.api_key: headers['x-token'] = self.api_key # Use 'requests' (validated allow-listed https URL) instead of urllib, # which would also honour file:// and other local schemes. if data is None: response = requests.get(full_url, headers=headers, timeout=30) else: response = requests.post(full_url, headers=headers, data=data, timeout=30) response.raise_for_status() b_rep = response.content if len(b_rep) == 64 and b_rep[0] != ord('{'): b_rep = b'{"txid":"' + b_rep + b'"}' self.js_res = json.loads(b_rep) except requests.exceptions.HTTPError as error: raise IOError(f"Error while processing request:\n{error.response.status_code} - " f"{error.response.text}") from error except Exception as error: raise IOError(f"Error while processing request:\n{self.url}{endpoint}?params_enc\n" f"{error}") from error
[docs] def check_api_resp(self) -> None: """ :rtype: None """ if 'error' in self.js_res: print(" !! ERROR :") raise Exception(self.js_res['error']) if 'errors' in self.js_res: print(" !! ERRORS :") raise Exception(self.js_res['errors'])
def _is_blockbook(self) -> bool: hostname = urlparse(self.url).hostname or "" return hostname.endswith(".quiknode.pro") or hostname.endswith(".quicknode.com") def _json_rpc(self, method: str, params=None): parsed = urlparse(self.url) if parsed.scheme != "https": raise ValueError("JSON-RPC endpoint must use HTTPS") params = params if params is not None else [] payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1} headers = {'User-Agent': 'Mozilla/5.0', 'Content-Type': 'application/json'} if self.api_key: headers['x-token'] = self.api_key resp = requests.post(self.url, headers=headers, json=payload, timeout=30) resp.raise_for_status() result = resp.json() if "error" in result and result["error"]: raise IOError(f"JSON-RPC error: {result['error']}") if "result" not in result: raise IOError("JSON-RPC malformed response: missing 'result' key") return result["result"]
[docs] def get_fee_estimates(self, blocks=6) -> int: if self._is_blockbook(): result = self._json_rpc("estimatesmartfee", [blocks]) feerate = result.get("feerate", 0) if feerate <= 0: return 0 return math.ceil(feerate * 100_000_000 / 1000) self.get_data("fee-estimates") block_entries = [int(x) for x in self.js_res.keys() if int(x) <= blocks] block_entries.sort() try: return math.ceil(self.js_res[str(block_entries.pop())]) except KeyError: return 0
[docs] def get_utx_os(self, addr: str, _n_conf: int) -> List: """ :param addr:str :param int _n_conf: 0 or 1 :return: list """ if self._is_blockbook(): self.get_data("api/v2/utxo/" + addr) sel_utx_os = [] for utxo in self.js_res: sel_utx_os.append({ 'value': int(utxo['value']), 'output': utxo['txid'] + ":" + str(utxo['vout']) }) return sel_utx_os self.get_data("address/" + addr + "/utxo") addr_utx_os = self.js_res sel_utx_os = [] for utxo in addr_utx_os: sel_utx_os.append({ 'value': utxo['value'], 'output': utxo['txid'] + ":" + str(utxo['vout']) }) return sel_utx_os
[docs] def push_tx(self, tx_hex: str) -> List: """ :param tx_hex: str :return: List """ if self._is_blockbook(): result = self._json_rpc("sendrawtransaction", [tx_hex]) return result self.get_data("tx", data=tx_hex.encode('ascii')) self.check_api_resp() return self.get_key('txid')
[docs] def get_key(self, key_char: str) -> List: """ :param key_char: str :return: list """ out = self.js_res path = key_char.split("/") for key in path: if key.isdigit(): key = int(key) try: out = out[key] except LookupError: out = [] return out
[docs] def test_addr(btc_addr: str): """ :param btc_addr: str :return: """ # Safe test of the address format if btc_addr.startswith("1") or btc_addr.startswith("3"): return re.match('^[13][a-km-zA-HJ-NP-Z1-9]{25,34}$', btc_addr) if btc_addr.startswith("n") or btc_addr.startswith( "m") or btc_addr.startswith("2"): return re.match('^[2nm][a-km-zA-HJ-NP-Z1-9]{25,34}$', btc_addr) if btc_addr.startswith("bc1") or btc_addr.startswith("tb1"): return re.match('^(bc1|tb1)[a-zA-HJ-NP-Z0-9]{25,62}$', btc_addr) return False
[docs] class BTCwallet: """ BTCwallet """ PATH = "m/44'/0'/0'/0/0"
[docs] def __init__(self, pubkey: str, coin_type: str, api, card) -> None: """ :param pubkey: str :param coin_type: str :param api: :param connection: """ addr_header = 0x00 self.testnet = False coin_type = coin_type.lower() if coin_type in ("testnet", "testnet4"): addr_header = 0x6F self.testnet = True self.pubkey = pubkey pkh = cryptos.bin_hash160(bytes.fromhex(pubkey)) self.address = cryptos.bin_to_b58check(pkh, addr_header) self.api = api self.card = card self.balance = None self.var_tx = None self.len_inputs = None self.data_hash = [] self.fee = 2000
[docs] def get_utx_os(self, n_conf: int = 0): """ :param n_conf: int (0 or 1) :return: """ return self.api.get_utx_os(self.address, n_conf)
[docs] def get_balance(self) -> float: """ :return: float """ utx_os = self.get_utx_os() return self.balance_fm_utxos(utx_os)
[docs] def get_fee_estimate(self): return self.api.get_fee_estimate()
[docs] def prepare(self, to_addr: str, payment_value: float, fee: float, utx_os: List = None) -> Union[float, int]: """ :param to_addr: str :param payment_value: float :param fee: float :param utx_os: pre-fetched UTXOs (fetched in parallel with fee estimate) :return: Union[float, int] """ self.fee = fee if not test_addr(to_addr): raise Exception("Bad address format.") if utx_os is None: utx_os = self.get_utx_os() balance = self.balance_fm_utxos(utx_os) self.balance = balance / 10.0 ** 8 max_spendable = balance - fee if payment_value > max_spendable: raise Exception("Not enough fund for the tx") inputs = self.select_utxos(payment_value + fee, utx_os) in_value = self.balance_fm_utxos(inputs) change_value = in_value - payment_value - fee outs = [{'value': payment_value, 'address': to_addr}] if change_value > 0: outs.append({'value': change_value, 'address': self.address}) self.var_tx = cryptos.coins.bitcoin.Bitcoin(testnet=self.testnet). \ mktx(inputs, outs) script = cryptos.mk_pubkey_script(self.address) # Finish tx # Sign each input self.len_inputs = len(inputs) for i in range(self.len_inputs): signing_tx = cryptos.signature_form(self.var_tx, i, script, cryptos.SIGHASH_ALL) self.data_hash.append(cryptos.bin_txhash(signing_tx, cryptos.SIGHASH_ALL)) return 0
[docs] def send(self, to_addr: str, payment_value: float, signature: List[bytes]) -> str: """ :param to_addr: str :param payment_value: float :return: str """ # Cryptnox Sign for i in range(0, self.len_inputs): self.var_tx["ins"][i]["script"] = cryptos.serialize_script([signature[i].hex() + "01", self.pubkey]) tabulate_table = [ ["BALANCE:", f"{self.balance}", "BTC", "ON", "ACCOUNT:", f"{self.address}"], ["TRANSACTION:", f"{payment_value / 10 ** 8}", "BTC", "FROM", "ACCOUNT:", f"{to_addr}"], ["FEE:", f"{self.fee / 10 ** 8}"], ["TOTAL:", (self.fee + payment_value) / 10 ** 8] ] floating_points = number_of_significant_digits( (self.fee + payment_value) / 10 ** 8) print("\n\n--- Transaction Ready ---\n") print(tabulate(tabulate_table, tablefmt='plain', floatfmt=f".{floating_points}f"), "\n") conf = input("Confirm ? [y/N] > ") if conf.lower() == "y": tx_hex = cryptos.serialize(self.var_tx) return "\nDONE, txID : " + self.api.push_tx(tx_hex) return "Canceled by the user."
[docs] @staticmethod def balance_fm_utxos(utxos) -> float: """ :param utxos: :return: float """ bal = 0 for utxo in utxos: bal += utxo['value'] return bal
[docs] @staticmethod def select_utxos(amount: float, utxos) -> List: """ :param amount: float :param utxos: :return: float """ sorted_utxos = sorted(utxos, key=lambda x: x['value'], reverse=True) sel_utxos = [] for s_utxo in sorted_utxos: amount -= s_utxo['value'] sel_utxos.append(s_utxo) if amount <= 0: break if amount <= 0: return sel_utxos raise Exception("Not enough utxos values for the tx")
[docs] class BtcValidator: """ Class defining Bitcoin validators """ network = EnumValidator(BtcNetworks) fees = IntValidator() derivation = EnumValidator(Derivation) api_key = AnyValidator() endpoint = AnyValidator()
[docs] def __init__(self, network: str = "testnet", fees: int = 2000, derivation: str = "DERIVE", api_key: str = "", endpoint: str = ""): self.network = network self.fees = fees self.derivation = derivation self.api_key = api_key self.endpoint = endpoint