confirmations – Easy methods to scan future transactions of specified quantity on a blockchain handle?

0
3
confirmations – Easy methods to scan future transactions of specified quantity on a blockchain handle?


I feel you may use ZeroMQ in a Python script to take heed to the daemon for brand spanking new transactions together with your required handle. Then, once you see a transaction witn your handle, you’ll be able to filter by the output quantity, possibly utilizing an API to transform BTC to USDT. You may want a operating Bitcoin full node and launch bitcoind with flag -zmqpubrawtx=tcp://127.0.0.1:28332. I feel one thing like the next ought to work.

import zmq
import json
import subprocess
import requests


ZMQ_ADDRESS = "tcp://127.0.0.1:28332"  # Guarantee bitcoind has been launched with flag `-zmqpubrawtx=tcp://127.0.0.1:28332`

# Goal Bitcoin handle to watch
TARGET_ADDRESS = "your-bitcoin-address"

# Minimal and most USDT equal in BTC
MIN_USDT = 100
MAX_USDT = 115

# API to fetch BTC/USDT value
PRICE_API = "your-preferred-API"

def get_btc_price():
    """Fetch the most recent BTC value in USD"""
    attempt:
        response = requests.get(PRICE_API).json()
        return float(response["bpi"]["USD"]["rate"].change(",", ""))
    besides Exception as e:
        print(f"Error fetching BTC value: {e}")
        return None

def decode_tx(raw_tx):
    """Decode a uncooked transaction utilizing bitcoin-cli"""
    attempt:
        end result = subprocess.run(["bitcoin-cli", "decoderawtransaction", raw_tx], capture_output=True, textual content=True)
        return json.masses(end result.stdout) if end result.stdout else None
    besides Exception as e:
        print(f"Error decoding transaction: {e}")
        return None

def process_transaction(raw_tx):
    """Course of and filter transactions"""
    btc_price = get_btc_price()
    if not btc_price:
        return

    tx_data = decode_tx(raw_tx)
    if not tx_data:
        return

    for vout in tx_data["vout"]:
        # Extract vacation spot handle
        if "addresses" in vout["scriptPubKey"]:
            for handle in vout["scriptPubKey"]["addresses"]:
                if handle == TARGET_ADDRESS:
                    amount_btc = vout["value"]
                    amount_usdt = amount_btc * btc_price
                    
                    if MIN_USDT <= amount_usdt <= MAX_USDT:
                        print(f"New Transaction Detected!")
                        print(f"TXID: {tx_data['txid']}")
                        print(f"Quantity: {amount_btc} BTC (~{amount_usdt:.2f} USDT)")
                        print(f"From: Unknown (examine inputs), To: {TARGET_ADDRESS}")

def listen_zmq():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.join(ZMQ_ADDRESS)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")  # Subscribe to all messages

    print("Listening for brand spanking new transactions...")

    whereas True:
        attempt:
            # Obtain uncooked transaction
            raw_tx = socket.recv_string()
            process_transaction(raw_tx)
        besides Exception as e:
            print(f"Error receiving transaction: {e}")

if __name__ == "__main__":
    listen_zmq()

LEAVE A REPLY

Please enter your comment!
Please enter your name here