import io
from .encoding.hash import double_sha256
from .encoding.hexbytes import b2h, b2h_rev
from .merkle import merkle
from .satoshi.satoshi_struct import parse_struct, stream_struct
class BadMerkleRootError(Exception):
pass
def difficulty_max_mask_for_bits(bits):
prefix = bits >> 24
mask = (bits & 0x7ffff) << (8 * (prefix - 3))
return mask
[docs]class Block(object):
"""A Block is an element of the Bitcoin chain."""
@classmethod
def make_subclass(class_, symbol, tx):
return type(
"%s_%s" % (symbol, class_.__name__),
(class_,),
dict(Tx=tx),
)
[docs] @classmethod
def parse(class_, f, include_transactions=True, include_offsets=None, check_merkle_hash=True):
"""
Parse the Block from the file-like object
"""
block = class_.parse_as_header(f)
if include_transactions:
count = parse_struct("I", f)[0]
txs = block._parse_transactions(f, count, include_offsets=include_offsets)
block.set_txs(txs, check_merkle_hash=check_merkle_hash)
return block
@classmethod
def from_bin(class_, bytes):
f = io.BytesIO(bytes)
return class_.parse(f)
def __init__(self, version, previous_block_hash, merkle_root, timestamp, difficulty, nonce):
self.version = version
self.previous_block_hash = previous_block_hash
self.merkle_root = merkle_root
self.timestamp = timestamp
self.difficulty = difficulty
self.nonce = nonce
self.txs = []
def set_nonce(self, nonce):
self.nonce = nonce
if hasattr(self, "__hash"):
del self.__hash
def _calculate_hash(self):
s = io.BytesIO()
self.stream_header(s)
return double_sha256(s.getvalue())
[docs] def hash(self):
"""Calculate the hash for the block header. Note that this has the bytes
in the opposite order from how the header is usually displayed (so the
long string of 00 bytes is at the end, not the beginning)."""
if not hasattr(self, "__hash"):
self.__hash = self._calculate_hash()
return self.__hash
@classmethod
def _parse_transactions(class_, f, count, include_offsets=None):
txs = []
for i in range(count):
if include_offsets:
offset_in_block = f.tell()
tx = class_.Tx.parse(f)
txs.append(tx)
if include_offsets:
tx.offset_in_block = offset_in_block
return txs
def set_txs(self, txs, check_merkle_hash=True):
self.txs = txs
if not txs:
return
for tx in txs:
tx.block = self
if check_merkle_hash:
self.check_merkle_hash()
def as_blockheader(self):
return Block(self.version, self.previous_block_hash, self.merkle_root,
self.timestamp, self.difficulty, self.nonce)
def _stream_transactions(self, f):
if self.txs:
stream_struct("I", f, len(self.txs))
for tx in self.txs:
tx.stream(f)
[docs] def stream(self, f):
"""Stream the block header in the standard way to the file-like object f.
The Block subclass also includes the transactions."""
self.stream_header(f)
self._stream_transactions(f)
[docs] def as_bin(self):
"""Return the block (or header) as binary."""
f = io.BytesIO()
self.stream(f)
return f.getvalue()
[docs] def as_hex(self):
"""Return the block (or header) as hex."""
return b2h(self.as_bin())
[docs] def id(self):
"""Returns the hash of the block displayed with the bytes in the order
they are usually displayed in."""
return b2h_rev(self.hash())
[docs] def previous_block_id(self):
"""Returns the hash of the previous block, with the bytes in the order
they are usually displayed in."""
return b2h_rev(self.previous_block_hash)
[docs] def check_merkle_hash(self):
"""Raise a BadMerkleRootError if the Merkle hash of the
transactions does not match the Merkle hash included in the block."""
calculated_hash = merkle([tx.hash() for tx in self.txs], double_sha256)
if calculated_hash != self.merkle_root:
raise BadMerkleRootError(
"calculated %s but block contains %s" % (b2h(calculated_hash), b2h(self.merkle_root)))
def __str__(self):
c = '%s%s' % (self.__class__.__name__, '' if self.txs else 'Header')
return "%s [%s] (previous %s)" % (c, self.id(), self.previous_block_id())
def __repr__(self):
return self.__str__()