Source code for pycoin.services.blockcypher

import json
import io

from .agent import urlopen

from pycoin.coins.bitcoin.Tx import Tx
from pycoin.encoding.hexbytes import b2h_rev, h2b, h2b_rev
from pycoin.networks.default import get_current_netcode


[docs]class BlockcypherProvider(object): def __init__(self, api_key="", netcode=None): NETWORK_PATHS = { "BTC": "btc/main", "XTN": "btc/test3", "DASH": "dash/main", } if netcode is None: netcode = get_current_netcode() self.network_path = NETWORK_PATHS.get(netcode) self.api_key = api_key
[docs] def base_url(self, args): return "https://api.blockcypher.com/v1/%s/%s" % (self.network_path, args)
[docs] def spendables_for_address(self, address): """ Return a list of Spendable objects for the given bitcoin address. """ spendables = [] url_append = "?unspentOnly=true&token=%s&includeScript=true" % self.api_key url = self.base_url("addrs/%s%s" % (address, url_append)) result = json.loads(urlopen(url).read().decode("utf8")) for txn in result.get("txrefs", []): coin_value = txn.get("value") script = h2b(txn.get("script")) previous_hash = h2b_rev(txn.get("tx_hash")) previous_index = txn.get("tx_output_n") spendables.append(Tx.Spendable(coin_value, script, previous_hash, previous_index)) return spendables
[docs] def tx_for_tx_hash(self, tx_hash): """ returns the pycoin.tx object for tx_hash """ try: url_append = "?token=%s&includeHex=true" % self.api_key url = self.base_url("txs/%s%s" % (b2h_rev(tx_hash), url_append)) result = json.loads(urlopen(url).read().decode("utf8")) tx = Tx.parse(io.BytesIO(h2b(result.get("hex")))) return tx except Exception: raise Exception
[docs] def get_balance(self, address): """ returns the balance object from blockcypher for address """ url_append = "/balance?token=%s" % self.api_key url = self.base_url("addrs/%s" % (address + url_append)) result = json.loads(urlopen(url).read().decode("utf8")) return result
[docs] def broadcast_tx(self, tx): """ broadcast a transaction to the network """ url = self.base_url("txs/push") data = {"tx": tx.as_hex()} result = json.loads(urlopen(url, data=json.dumps(data)).read().decode("utf8")) return result