Source code for pycoin.services.blockchain_info
import io
import json
import warnings
from .agent import request, urlencode, urlopen
from pycoin.coins.bitcoin.Tx import Tx
from pycoin.encoding.hexbytes import b2h, h2b, b2h_rev
[docs]class BlockchainInfoProvider(object):
def __init__(self, netcode):
if netcode == 'BTC':
self.api_domain = "https://blockchain.info"
elif netcode == "XTN":
self.api_domain = "https://testnet.blockchain.info"
else:
raise ValueError("unsupported netcode %s" % netcode)
[docs] def tx_for_tx_hash(self, tx_hash):
"Get a Tx by its hash."
URL = self.api_domain + ("/rawtx/%s?format=hex" % b2h_rev(tx_hash))
tx = Tx.from_hex(urlopen(URL).read().decode("utf8"))
return tx
[docs] def payments_for_address(self, address):
"return an array of (TX ids, net_payment)"
URL = self.api_domain + ("/address/%s?format=json" % address)
d = urlopen(URL).read()
json_response = json.loads(d.decode("utf8"))
response = []
for tx in json_response.get("txs", []):
total_out = 0
for tx_out in tx.get("out", []):
if tx_out.get("addr") == address:
total_out += tx_out.get("value", 0)
if total_out > 0:
response.append((tx.get("hash"), total_out))
return response
[docs] def spendables_for_address(self, address):
"""
Return a list of Spendable objects for the
given bitcoin address.
"""
URL = self.api_domain + "/unspent?active=%s" % address
r = json.loads(urlopen(URL).read().decode("utf8"))
spendables = []
for u in r["unspent_outputs"]:
coin_value = u["value"]
script = h2b(u["script"])
previous_hash = h2b(u["tx_hash"])
previous_index = u["tx_output_n"]
spendables.append(Tx.Spendable(coin_value, script, previous_hash, previous_index))
return spendables
[docs] def broadcast_tx(self, tx):
s = io.BytesIO()
tx.stream(s)
tx_as_hex = b2h(s.getvalue())
data = urlencode(dict(tx=tx_as_hex)).encode("utf8")
URL = self.api_domain + "/pushtx"
try:
d = urlopen(URL, data=data).read()
return d
except request.HTTPError as ex:
try:
d = ex.read()
ex.message = d
except Exception:
pass
raise ex
[docs]def send_tx(self, tx):
warnings.warn("use BlockchainInfoProvider.broadcast_tx instead of send_tx",
category=DeprecationWarning)
return BlockchainInfoProvider().broadcast_tx(tx)