python – Parsing uncooked bitcoin transactions, parsing works for one instance, however not for one more

0
23
python – Parsing uncooked bitcoin transactions, parsing works for one instance, however not for one more


I’m engaged on a bitcoin transaction parsing class in python. The unusual factor is, I’m able to efficiently parse an instance transaction, however can’t parse every other, and I’m struggling to search out out what on earth is occurring that’s inflicting the difficulty.

there may be rather a lot to unpack, however right here is my implementation of the transaction class(es)

from btclib.utils import hash256
import requests
from io import BytesIO
import json 

from .script import Script 
from .utils import Varint
from typing import BinaryIO, Checklist


class TxFetcher:
    cache = {}

    @classmethod
    def get_url(cls, testnet=False):
        if testnet:
            return "https://blockstream.data/testnet/api/"
        else:
            return "https://blockstream.data/api/"

    @classmethod
    def fetch(cls, tx_id, testnet=False, contemporary=False):
        if contemporary or (tx_id not in cls.cache):
            url = f"{cls.get_url(testnet)}/tx/{tx_id}/hex"
            response = requests.get(url)
            print(f"{response.textual content=}")
            attempt:
                uncooked = bytes.fromhex(response.textual content.strip())
            besides ValueError:
                increase ValueError(f"Surprising response: {response.textual content}")
            if uncooked[4] == 0:
                print("Hit")
                uncooked = uncooked[:4] + uncooked[6:]
                tx = Tx.parse(BytesIO(uncooked), testnet=testnet)
                tx.locktime = int.from_bytes(uncooked[-4:], "little")
            else:
                print("Hit else")
                tx = Tx.parse(BytesIO(uncooked), testnet=testnet)
            if tx.id() != tx_id:
                increase ValueError(
                    f"Acquired non-matching tx ids: {tx.id()} versus {tx_id}"
                )

            cls.cache[tx_id] = tx

        cls.cache[tx_id].testnet = testnet
        return cls.cache[tx_id]
    
    @classmethod
    def load_cache(cls, filepath):
        with open(filepath, 'r') as f:
            disk_cache = json.masses(f.learn())
        
        for okay, raw_hex in disk_cache.gadgets():
            uncooked = bytes.fromhex(raw_hex)
            if uncooked[0] == 0:
                uncooked = uncooked[:4] + uncooked[6:]
                tx = Tx.parse(BytesIO(uncooked))
                tx.locktime = int.from_bytes(uncooked[-4:], 'little')
            else:
                tx = Tx.parse(BinaryIO(uncooked))
    

    @classmethod
    def dump_cache(cls, filepath):
        with open(filepath, 'w') as f:
            dump = {okay: tx.serialize().hex() for okay,tx in cls.cache.gadgets()}
            f.write(json.dumps(dump, sort_keys=True, indent=4))





class TxIn:

    def __init__(self, prev_tx, prev_index, script_sig=None, sequence=0xFFFFFFFF, testnet=False):
        self.prev_tx = prev_tx
        self.prev_index = prev_index
        
        if script_sig is None:
            self.script_sig = Script()
        else:
            self.script_sig = script_sig
        
        
        self.sequence = sequence

    def __repr__(self):
        return f"{self.prev_tx.hex()}:{self.prev_index}"

    def fetch_tx(self, testnet=False):
        return TxFetcher.fetch(self.prev_tx.hex(), testnet=testnet)

    def worth(self, testnet=False):
        tx = self.fetch_tx(testnet=testnet)
        return tx.tx_outs[self.prev_index].quantity

    def script_pubkey(self, testnet=False):
        tx = self.fetch_tx(self.prev_index, testnet)
        return tx.tx_outs[self.prev_index].script_pubkey

    def serialize(self):
        '''
        result's a byte string 
        '''
        outcome = self.prev_tx[::-1]
        # self.prev_index is already bytes? why to bytes?
        outcome += self.prev_index.to_bytes(4, "little")
        outcome += self.script_sig.serialize()
        outcome += self.sequence.to_bytes(4, "little")
        return outcome

    @classmethod
    def parse(cls, s:BinaryIO):
        '''
        the stream has already had the:
         - verions: 4 bytes 
         - variety of inputs: varint; 4 bytes
         consumed.  

         subsequent, for the enter object, we parse:
         - earlier transaction hash: 32 bytes
         - earlier transaction index: 4 bytes 

        '''
        prev_tx_hash = s.learn(32)[::-1]
        
        print('txin: prev_tx_hash', prev_tx_hash.hex())
        # prev_tx_hash = int.from_bytes(prev_tx_hash_bytes, 'little')
        prev_tx_idx = int.from_bytes(s.learn(4), 'little')

        print("txin: earlier tx id :", prev_tx_idx)
        # print(f"{Varint.decode(s)=}")
        script_sig = Script.parse(s)
        # print("Byte stream left: ", len(s.learn()))
        # script_sig = Script.parse(s)cr
        # prev_tx_idx = int.from_bytes(prev_tx_idx_bytes, 'little')

        return TxIn(prev_tx=prev_tx_hash, script_sig=script_sig, prev_index=prev_tx_idx)



class TxOut:

    def __init__(self, quantity, script_pubkey):
        self.quantity = quantity
        self.script_pubkey = script_pubkey

    def __repr__(self):
        return f"{self.quantity}:{self.script_pubkey}"

    def serialize(self):

        outcome = self.quantity.to_bytes(8, "little")
        outcome += self.script_pubkey.serialize()
        return outcome
    
    @classmethod
    def parse(cls, s:BinaryIO):
        # num_outputs = Varint.decode(s)
        # print(f"{num_outputs=}") #that is appropriate 
        # TxOut()
        # making an attempt to extract he quantity is close to imposssible 
        # print("s.learn(8) to little int", int.from_bytes(s.learn(8), 'little'))
        quantity = int.from_bytes(s.learn(8), 'little') 
        print(f"txout: {quantity=}")      
        # print(f"{quantity=}")
        # script_pub_key_len = Varint.decode(s)
        # print(s.learn(8).hex())

        script_pubkey = Script.parse(s)
        return TxOut(quantity=quantity, script_pubkey=script_pubkey)
  



class Tx:

    def __init__(self,locktime: int, model:int, tx_ins:Checklist[TxIn]=[], tx_outs : Checklist[TxOut]=[] , testnet: bool =False):
        self.model = model
        self.tx_ins = tx_ins
        self.tx_outs = tx_outs
        self.locktime = locktime
        self.testnet = testnet

    def __repr__(self):
        tx_ins = ""
        for tx_in in self.tx_ins:
            tx_ins += tx_in.__repr__() + "n"
        tx_outs = ""
        for tx_out in self.tx_outs:
            tx_outs += tx_out.__repr__() + "n"
        return f"Tx: {self.id()}nverison: {self.model}ntx_ins:n{tx_ins}tx_outs:n{tx_outs}locktime: {self.locktime}"
    

    def price(self):
        price = sum([tx_in.value(self.testnet) for tx_in in self.tx_ins]) - sum([tx_out.amount for tx_out in self.tx_outs])
        assert price > 0, "The price someway got here out as detrimental, i.e. price={price}"
        return price 
    

    def id(self):
        return self.hash().hex()

    def hash(self):
        return hash256(self.serialize())[::-1]

    def serialize(self) -> bytes:
        outcome = self.model.to_bytes(4, "little")
        outcome += Varint.encode(len(self.tx_ins))
        for tx_in in self.tx_ins:
            outcome += tx_in.serialize()

        outcome += Varint.encode(len(self.tx_outs))
        for tx_out in self.tx_outs:
            outcome += tx_out.serialize()
        outcome += self.locktime.to_bytes(4, "little")
        return outcome

    @classmethod
    def parse(cls, s:BinaryIO, testnet : bool = False):

        v_bytes = s.learn(4) # first 4 bytes are model bytes 
        model = int.from_bytes(v_bytes, 'little')
        print(f"tx: {model=}")
        num_inputs = Varint.decode(s)
        print("tx: num inputs", num_inputs)
        inputs = []
        for _ in vary(num_inputs):
            inputs.append(TxIn.parse(s)) #  for _ in vary(num_inputs)]

        sequence = int.from_bytes(s.learn(4), 'little')
        print(f"{sequence=}") 

        for i in vary(len(inputs)):
            inputs[i].sequence = sequence

        num_outputs = Varint.decode(s)
        outputs = []
        for _ in vary(num_outputs):
            outputs.append(TxOut.parse(s)) #   for _ in vary(num_outputs)]
        locktime = int.from_bytes(s.learn(4), 'little')


        return Tx(model=model, tx_outs=outputs, locktime=locktime, tx_ins=inputs, testnet=testnet)
    

for completeness right here is myVarint implementation:


class Varint:
    '''
    For parsing the mount of inputs, the place there could also be greater than 255 (a single byte) quantity of inputs 
    if x < 253:
        encode as single byte 
        
    if 65535 > x >= 253:
        begin with 253 byte [ fd ] then encode quantity in 2 bytes utilizing little-endian 
        e.g 255 -> fd + int(255).to_bytes(2, 'little').hex() = fdxff00
        e.g 555 -> fd + int(555).to_bytes(2, 'little').hex() = fd2b02

    if 4294967295 > x >= 65535:
        begin with 254 byte [ fe ] then encode the quantity in 4 bytes utilizing little-endian 
        e.g. 70015 -> 0xfe + int(70015).to_bytes(4, 'little').hex() = fe7f110100

    if  18446744073709551615 > x >= 4294967295:
        strt with 255 byte [ ff ] then encode the quantity in 8 bytes utilizing little-endian 
        e.g.  18005558675309 -> ff int(18005558675309).to_bytes(8, 'little').hex() = ff6dc7ed3e60100000
    '''
    def decode(s):
        i = s.learn(1)[0]
        if i == 0xFD:
            return int.from_bytes(s.learn(2), "little")
        elif i == 0xFE:
            return int.from_bytes(s.learn(4), "little")
        elif i == 0xFF:
            return int.from_bytes(s.learn(8), "little")
        else:
            return i


    def encode(i):
        if i < 0xFD:
            return bytes([i])
        elif i < 0x10000:
            return b"xfd" + i.to_bytes(2, "little")
        elif i < 0x100000000:
            return b"xfe" + i.to_bytes(4, "little")
        elif i < 0x10000000000000000:
            return b"xff" + i.to_bytes(8, "little")
        else:
            increase ValueError(f"Integer {i} is just too massive")


and Script implementation

class Script:

    def __init__(self, cmds=None):
        if cmds is None:
            self.cmds = []
        else:
            self.cmds = cmds
    

    
    def raw_serialize(self):
        # initialize what we'll ship again
        outcome = b''
        # undergo every cmd
        for cmd in self.cmds:
            # if the cmd is an integer, it is an opcode
            if sort(cmd) == int:
                # flip the cmd right into a single byte integer utilizing int_to_little_endian
                outcome += cmd.to_bytes(1, 'little')
                # print(f"raw_serialize: {outcome}")
            else:
                # in any other case, this is a component
                # get the size in bytes
                size = len(cmd)
                # for big lengths, now we have to make use of a pushdata opcode
                if size < 75:
                    # flip the size right into a single byte integer
                    outcome += size.to_bytes(1, 'little')
                elif size > 75 and size < 0x100:
                    # 76 is pushdata1
                    outcome += int(76).to_bytes(1, 'little')
                    outcome += size.to_bytes(1, 'little')
                elif size >= 0x100 and size <= 520:
                    # 77 is pushdata2
                    outcome += int(77).to_bytes(1, 'little')
                    outcome += size.to_bytes(2, 'little')
                else:
                    increase ValueError('too lengthy an cmd')
                outcome += cmd
        return outcome



    def serialize(self):
        # get the uncooked serialization (no prepended size)
        outcome = self.raw_serialize()
        # get the size of the entire thing
        whole = len(outcome)
        # encode_varint the full size of the outcome and prepend
        return Varint.encode(whole) + outcome
        return  outcome

    @classmethod
    def parse(cls, s):
        # get the size of the complete subject
        size = Varint.decode(s)
        # print(f"size: {size}")
        # initialize the cmds array
        cmds = []
        # initialize the variety of bytes we have learn to 0
        depend = 0
        # loop till we have learn size bytes
        whereas depend < size:
            # get the present byte
            present = s.learn(1)
            # increment the bytes we have learn
            depend += 1
            # convert the present byte to an integer
            current_byte = present[0]
            # if the present byte is between 1 and 75 inclusive
            if current_byte >= 1 and current_byte <= 75:
                # now we have an cmd set n to be the present byte
                n = current_byte
                # add the following n bytes as an cmd
                cmds.append(s.learn(n))
                # enhance the depend by n
                depend += n
            elif current_byte == 76:
                # op_pushdata1
                data_length = int.from_bytes(s.learn(1), 'little')
                cmds.append(s.learn(data_length))
                depend += data_length + 1
            elif current_byte == 77:
                # op_pushdata2
                data_length = int.from_bytes(s.learn(2), 'little')
                cmds.append(s.learn(data_length))
                depend += data_length + 2
            else:
                # now we have an opcode. set the present byte to op_code
                op_code = current_byte
                # add the op_code to the checklist of cmds
                cmds.append(op_code)
        if depend != size:
            increase SyntaxError('parsing script failed')
        return cls(cmds)

Now I can parse and re-serialize the transaction

0100000001813f79011acb80925dfe69b3def355fe914bd1d96a3f5f71bf8303c6a989c7d1000000006b483045022100ed81ff192e75a3fd2304004dcadb746fa5e24c5031ccfcf21320b0277457c98f02207a986d955c6e0cb35d446a89d3f56100f4d7f67801c31967743a9c8e10615bed01210349fc4e631e3624a545de3f89f5d8684c7b8138bd94bdd531d2e213bf016b278afeffffff02a135ef01000000001976a914bc3b654dca7e56b04dca18f2566cdaf02e8d9ada88ac99c39800000000001976a9141c4bc762dd5423e332166702cb75f40df79fea1288ac19430600

with out subject, and i’m in a position confirm this by means of a blockchain explorer

once I tried to do the identical factor with this transaction as an alternative:

0100000002137c53f0fb48f83666fcfd2fe9f12d13e94ee109c5aeabbfa32bb9e02538f4cb000000006a47304402207e6009ad86367fc4b166bc80bf10cf1e78832a01e9bb491c6d126ee8aa436cb502200e29e6dd7708ed419cd5ba798981c960f0cc811b24e894bff072fea8074a7c4c012103bc9e7397f739c70f424aa7dcce9d2e521eb228b0ccba619cd6a0b9691da796a1ffffffff517472e77bc29ae59a914f55211f05024556812a2dd7d8df293265acd8330159010000006b483045022100f4bfdb0b3185c778cf28acbaf115376352f091ad9e27225e6f3f350b847579c702200d69177773cd2bb993a816a5ae08e77a6270cf46b33f8f79d45b0cd1244d9c4c0121031c0b0b95b522805ea9d0225b1946ecaeb1727c0b36c7e34165769fd8ed860bf5ffffffff027a958802000000001976a914a802fc56c704ce87c42d7c92eb75e7896bdc41ae88aca5515e00000000001976a914e82bd75c9c662c3f5700b33fec8a676b6e9391d588ac00000000

This system fails.

It is ready to extract the primary earlier transaction hash (there may be two)

cbf43825e0b92ba3bfabaec509e14ee9132df1e92ffdfc6636f848fbf0537c13

however for some purpose, the following earlier transaction hash is offset,

ac653229dfd8d72d2a81564502051f21554f919ae59ac27be7727451ffffffff

the place it’s really imagined to be:

590133d8ac653229dfd8d72d2a81564502051f21554f919ae59ac27be7727451

I suppose the trailing ffffffff on the inaccurate transaction hash are literally a part of the sequence and never the transaction hash.

I’m actually fighting de-bugging this and thought probably there could also be some consultants on right here that may see a flaw in my implementation.

Thanks in your time and even simply studying this, since i do know its lots tot soak up. However any assist what so over can be appreciated. I hope you get pleasure from the remainder of your day.

LEAVE A REPLY

Please enter your comment!
Please enter your name here