I’ll simply throw it out right here, there’s a higher approach to do it, right here above I used to be utilizing Huobi SDK however I discover utilizing websocket with none exterior libraries higher. A terrific template for websocket connection beneath:
import gzip
import json
import pprint
import threading
import time
import websocket
def send_message(ws, message_dict):
knowledge = json.dumps(message_dict).encode()
print("Sending Message:")
pprint.pprint(message_dict)
ws.ship(knowledge)
def on_message(ws, message):
unzipped_data = gzip.decompress(message).decode()
msg_dict = json.masses(unzipped_data)
print("Recieved Message: ")
pprint.pprint(msg_dict)
if 'ping' in msg_dict:
knowledge = {
"pong": msg_dict['ping']
}
send_message(ws, knowledge)
def on_error(ws, error):
print("Error: " + str(error))
error = gzip.decompress(error).decode()
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
knowledge = {
"req": "market.btcusdt.kline.1min",
"id": "id1"
"from": int("fromtimestamp")
"to": int("to timestamp")
}
for i in vary(5):
time.sleep(2)
send_message(ws, knowledge)
ws.shut()
print("thread terminating...")
t = threading.Thread(goal=run, args=())
t.begin()
if __name__ == "__main__":
# websocket.enableTrace(True)
ws = websocket.WebSocketApp(
"wss://api.huobi.professional/ws",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close
)
ws.run_forever()