185 lines
5.0 KiB
Python
185 lines
5.0 KiB
Python
# coding=utf-8
|
|
import ujson
|
|
from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient
|
|
from binance.spot import Spot
|
|
import time
|
|
import requests
|
|
import datetime
|
|
import pymysql
|
|
import math
|
|
import pymongo
|
|
|
|
g_spot_client = Spot()
|
|
|
|
|
|
class Pair:
|
|
def __init__(self):
|
|
pass
|
|
|
|
depth_u = 0
|
|
depth_U = 0
|
|
depth_ts = 0
|
|
bids = {}
|
|
asks = {}
|
|
|
|
|
|
g_btcusdt = None
|
|
g_btcusdt = None
|
|
|
|
|
|
def init_db():
|
|
mc = pymongo.MongoClient("mongodb://127.0.0.1:27020/")
|
|
mdb = mc["border2"]
|
|
return mc, mdb
|
|
|
|
|
|
def get_depth(client, pair):
|
|
new_pair = Pair()
|
|
d = client.depth(pair, limit=5000)
|
|
new_pair.bids = d["bids"]
|
|
new_pair.asks = d["asks"]
|
|
new_pair.depth_u = d["lastUpdateId"]
|
|
print(pair, ": get_depth: init", new_pair.depth_u)
|
|
#print(new_pair.bids)
|
|
return new_pair
|
|
|
|
|
|
def dict2number(dict_in):
|
|
dict_out = {}
|
|
#print("dict2number", dict_in)
|
|
for id in dict_in:
|
|
#print("dict2number", id)
|
|
#price = (int(float(id[0])) / 100) * 100
|
|
#price = float(id[0])
|
|
quantity = float(id[1])
|
|
#pricestr = str(price)
|
|
dict_out[id[0]] = quantity
|
|
return dict_out
|
|
|
|
|
|
def dict2save(mdb, pair, dict_in, ts):
|
|
mdbc = mdb[pair]
|
|
s_append = {}
|
|
s_append["unixdt"] = int(ts / 1000)
|
|
#cnt = 0
|
|
for id in dict_in:
|
|
# print(cnt, id)
|
|
#if cnt >= 50:
|
|
#break
|
|
# bids_append[id] = top_bids[id]
|
|
s_append[id[0]] = id[1]
|
|
#cnt += 1
|
|
print("dict2save", s_append)
|
|
mdbc.insert_one(s_append)
|
|
|
|
def classify_order(dict_in):
|
|
dict_out = {}
|
|
for id in dict_in:
|
|
price = int(int(float(id))/100)*100
|
|
pricestr = str(price)
|
|
if pricestr in dict_out:
|
|
dict_out[pricestr] = dict_out[pricestr]+dict_in[id]
|
|
else:
|
|
dict_out[pricestr] = dict_in[id]
|
|
return dict_out
|
|
|
|
def stat_order(pair, bids_in, asks_in, ts, old_ts):
|
|
print(pair, ": stat_order cmp", ts, old_ts)
|
|
if ts - old_ts < 1000 * 60 * 5:
|
|
return False
|
|
bids = dict2number(bids_in)
|
|
asks = dict2number(asks_in)
|
|
|
|
bids_classify = classify_order(bids)
|
|
asks_classify = classify_order(asks)
|
|
print("bids_classify", bids_classify)
|
|
top_bids = sorted(bids_classify.items(), key=lambda x: x[1], reverse=False)
|
|
top_asks = sorted(asks_classify.items(), key=lambda x: x[1], reverse=False)
|
|
print("top_bids", top_bids)
|
|
mc, mdb = init_db()
|
|
|
|
dict2save(mdb, pair + "_bids", top_bids, ts)
|
|
dict2save(mdb, pair + "_asks", top_asks, ts)
|
|
print(pair, ": stat_order OK at", ts)
|
|
return True
|
|
|
|
|
|
def merge_order(dst, src):
|
|
new_dst = []
|
|
for dst_item in dst:
|
|
found = False
|
|
for src_item in src:
|
|
#print("dst", dst_item, "src", src_item)
|
|
if dst_item[0] == src_item[0]:
|
|
new_dst.append(src_item)
|
|
found = True
|
|
break
|
|
if found is False:
|
|
#print("merge_order dst copy", dst_item)
|
|
new_dst.append(dst_item)
|
|
return new_dst
|
|
|
|
|
|
def handler_order(pair, pair_name, msg_in, client):
|
|
ts = msg_in["E"]
|
|
dU = msg_in["U"]
|
|
du = msg_in["u"]
|
|
need_reinit = False
|
|
if pair is not None:
|
|
if (dU == pair.depth_u + 1) or (
|
|
(du > pair.depth_u) and (pair.depth_ts == 0) and (pair.depth_u != 0)):
|
|
bids = msg_in["b"]
|
|
asks = msg_in["a"]
|
|
#print("merge_order dst", pair.bids)
|
|
#print("merge_order src", bids)
|
|
#print("handle", pair_name, ts, dU, du, pair.depth_u)
|
|
pair.bids = merge_order(pair.bids, bids)
|
|
pair.asks = merge_order(pair.asks, asks)
|
|
pair.depth_U = dU
|
|
pair.depth_u = du
|
|
if stat_order(pair_name, pair.bids, pair.asks, ts, pair.depth_ts):
|
|
pair.depth_ts = ts
|
|
print(pair_name, ": append", du)
|
|
else:
|
|
if (dU != pair.depth_u + 1) and (pair.depth_u != 0):
|
|
need_reinit = True
|
|
else:
|
|
pass
|
|
if need_reinit:
|
|
pair = get_depth(client, pair_name)
|
|
print(pair_name, ": reinit", pair.depth_u, dU, pair.depth_ts)
|
|
return pair
|
|
|
|
|
|
def order_handler(message):
|
|
#print(message)
|
|
global g_spot_client
|
|
global g_btcusdt
|
|
global g_ethusdt
|
|
if message["stream"] == "btcusdt@depth":
|
|
ddata = message["data"]
|
|
if ddata["e"] == "depthUpdate":
|
|
g_btcusdt = handler_order(g_btcusdt, "BTCUSDT", ddata, g_spot_client)
|
|
elif message["stream"] == "ethusdt@depth":
|
|
ddata = message["data"]
|
|
if ddata["e"] == "depthUpdate":
|
|
g_ethusdt = handler_order(g_ethusdt, "ETHUSDT", ddata, g_spot_client)
|
|
else:
|
|
pass
|
|
|
|
def check_order():
|
|
global g_spot_client
|
|
global g_btcusdt
|
|
global g_ethusdt
|
|
ws_client = WebsocketClient()
|
|
ws_client.start()
|
|
ws_client.instant_subscribe(
|
|
stream=['btcusdt@depth', 'ethusdt@depth'],
|
|
callback=order_handler,
|
|
)
|
|
g_btcusdt = get_depth(g_spot_client, "BTCUSDT")
|
|
g_ethusdt = get_depth(g_spot_client, "ETHUSDT")
|
|
|
|
|
|
check_order()
|