From aad77378ab8228e40bc3c62de30c251f8e669683 Mon Sep 17 00:00:00 2001 From: fengche <1158629543@qq.com> Date: Fri, 30 Jan 2026 17:47:21 +0800 Subject: [PATCH] =?UTF-8?q?js=E4=BB=A3=E7=A0=81=E6=94=B9=E5=86=99py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- m2pool_backend_app/README.md | 0 m2pool_backend_app/app.py | 187 +++++++ m2pool_backend_app/config/example.txt | 130 +++++ m2pool_backend_app/lib/mysql.py | 120 ++++ m2pool_backend_app/lib/node.py | 298 ++++++++++ m2pool_backend_app/lib/redis.py | 40 ++ m2pool_backend_app/public/baseResponse.py | 29 + .../public/distribution-hashrate.py | 11 + .../public/distribution-shares.py | 45 ++ m2pool_backend_app/public/endian.py | 11 + m2pool_backend_app/public/index.sql | 513 ++++++++++++++++++ m2pool_backend_app/public/retry.py | 22 + m2pool_backend_app/public/score.py | 35 ++ m2pool_backend_app/public/times.py | 66 +++ m2pool_backend_app/src/balance.py | 207 +++++++ m2pool_backend_app/src/confirm.py | 111 ++++ m2pool_backend_app/src/distribution.py | 265 +++++++++ m2pool_backend_app/src/hashrate.py | 280 ++++++++++ m2pool_backend_app/src/init.py | 112 ++++ m2pool_backend_app/src/report.py | 167 ++++++ 20 files changed, 2649 insertions(+) create mode 100644 m2pool_backend_app/README.md create mode 100644 m2pool_backend_app/app.py create mode 100644 m2pool_backend_app/config/example.txt create mode 100644 m2pool_backend_app/lib/mysql.py create mode 100644 m2pool_backend_app/lib/node.py create mode 100644 m2pool_backend_app/lib/redis.py create mode 100644 m2pool_backend_app/public/baseResponse.py create mode 100644 m2pool_backend_app/public/distribution-hashrate.py create mode 100644 m2pool_backend_app/public/distribution-shares.py create mode 100644 m2pool_backend_app/public/endian.py create mode 100644 m2pool_backend_app/public/index.sql create mode 100644 m2pool_backend_app/public/retry.py create mode 100644 m2pool_backend_app/public/score.py create mode 100644 m2pool_backend_app/public/times.py create mode 100644 m2pool_backend_app/src/balance.py create mode 100644 m2pool_backend_app/src/confirm.py create mode 100644 m2pool_backend_app/src/distribution.py create mode 100644 m2pool_backend_app/src/hashrate.py create mode 100644 m2pool_backend_app/src/init.py create mode 100644 m2pool_backend_app/src/report.py diff --git a/m2pool_backend_app/README.md b/m2pool_backend_app/README.md new file mode 100644 index 0000000..e69de29 diff --git a/m2pool_backend_app/app.py b/m2pool_backend_app/app.py new file mode 100644 index 0000000..08b58d2 --- /dev/null +++ b/m2pool_backend_app/app.py @@ -0,0 +1,187 @@ +import schedule +import time +import asyncio +import sys +from datetime import datetime, timedelta + +from public.times import Times +from src.hashrate import HashRate +from src.report import Report, ReportEnx +from src.confirm import Confirm +from src.distribution import Distribution +from src.blanace import Balance, DGBBlance +from src.clear import ClearDBData +from src.notice import Notice + + +def main(): + # 获取命令行参数 + if len(sys.argv) < 3: + raise Exception("请提供方法和币种参数") + + method = sys.argv[1] + methods = ["hashrate", "report", "clear", "distribution", "confirm", "balance", "stats", "notice"] + + if method not in methods: + raise Exception(f"暂不支持{method}方法") + + coin = sys.argv[2] + coins = ["nexa", "mona", "grs", "dgbq", "dgbs", "dgbo", "rxd", "enx", "alph"] + + if coin not in coins: + raise Exception(f"暂不支持{coin}") + + # hashrate 任务 + if method == "hashrate": + hashrate = HashRate(coin) + + def hashrate_job(): + asyncio.run(hashrate_task(hashrate)) + + # 每5分钟的第30秒执行 + schedule.every().hour.at(":00:30").do(hashrate_job) + schedule.every().hour.at(":05:30").do(hashrate_job) + schedule.every().hour.at(":10:30").do(hashrate_job) + schedule.every().hour.at(":15:30").do(hashrate_job) + schedule.every().hour.at(":20:30").do(hashrate_job) + schedule.every().hour.at(":25:30").do(hashrate_job) + schedule.every().hour.at(":30:30").do(hashrate_job) + schedule.every().hour.at(":35:30").do(hashrate_job) + schedule.every().hour.at(":40:30").do(hashrate_job) + schedule.every().hour.at(":45:30").do(hashrate_job) + schedule.every().hour.at(":50:30").do(hashrate_job) + schedule.every().hour.at(":55:30").do(hashrate_job) + + # report 任务 + elif method == "report": + if coin == "enx": + report = ReportEnx(coin) + else: + report = Report(coin) + + interval = 60 # 秒 + if coin == "rxd": + interval = 300 + elif coin == "nexa": + interval = 120 + + def report_job(): + report.main() + + schedule.every(interval).seconds.do(report_job) + + # confirm 任务 + elif method == "confirm": + interval = 60 # 秒 + if coin == "rxd": + interval = 300 + elif coin == "nexa": + interval = 120 + + confirm = Confirm(coin) + + def confirm_job(): + confirm.main() + + schedule.every(interval).seconds.do(confirm_job) + + # distribution 任务 + elif method == "distribution": + distribution = Distribution(coin) + + now_ts = datetime.now() + last_ts = now_ts - timedelta(hours=24) + + ymd_now = Times.utc_time(now_ts.isoformat()) + ymd_last = Times.utc_time(last_ts.isoformat()) + + end_time = ymd_now.split(" ")[0] + " 00:00:00" + start_time = ymd_last.split(" ")[0] + " 00:00:00" + + distribution.main(start_time, end_time) + + # balance 任务 + elif method == "balance": + special_coins = ["dgbo", "dgbs", "dgbq"] + + if coin in special_coins: + balance = DGBBlance(coin) + else: + balance = Balance(coin) + + hour = 4 + if coin in ["rxd", "alph"]: + hour = 9 + + async def balance_task_async(balance_obj): + count = 0 + last_height = await balance_obj.node.getblockcount() + + while count < 36: # 最多执行 36 次 (6小时) + enable = await balance_obj.query_now_height(last_height) + if enable: + result = await balance_obj.main() + if not result: + print(f"{coin}转账已完成") + return # 成功执行后退出循环 + + print(f"等待中... (已等待 {count * 10} 分钟)") + await asyncio.sleep(1000 * 60 * 10) # 休眠 10 分钟 + count += 1 + + print("等待超时,任务结束!") + + def balance_job(): + asyncio.run(balance_task_async(balance)) + + schedule.every().day.at(f"{hour:02d}:10:00").do(balance_job) + + # clear 任务 + elif method == "clear": + clear = ClearDBData(coin) + + try: + clear.clear_shares_db(72) + print("sharesdb:ok") + clear.clear_hashrate_db() + print("hashratedb:ok") + except Exception as err: + print(err) + finally: + sys.exit(0) + + # notice 任务 + elif method == "notice": + notice = Notice(coin) + + def notice_job(): + notice.main() + + schedule.every().day.at("09:30:00").do(notice_job) + + # 运行调度器 + while True: + schedule.run_pending() + time.sleep(1) + + +async def hashrate_task(hashrate): + """hashrate 异步任务""" + ymd_now = Times.utc_time(datetime.now().isoformat()) + ymd = ymd_now.split(":") + end_time = ymd[0] + ":" + ymd[1] + ":00" + + await hashrate.insert_hashrate_miners_table(end_time) + + current_minute = datetime.now().minute + data = await hashrate.query_hashrate_miners_accepts(end_time) + + if current_minute == 0 or current_minute == 30: + await hashrate.insert_mhs(data) + await hashrate.insert_mhs_real(data) + else: + await hashrate.insert_mhs_real(data) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/m2pool_backend_app/config/example.txt b/m2pool_backend_app/config/example.txt new file mode 100644 index 0000000..32b85d7 --- /dev/null +++ b/m2pool_backend_app/config/example.txt @@ -0,0 +1,130 @@ +// this file please use .conf +{ + "master":{ + "pooldb":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + }, + "sharesdb":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + }, + "distribution":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + }, + "hashrate":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + }, + "users_addresses":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + }, + "balance":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + } + }, + "slave":{ + "pooldb_slave":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + }, + "sharesdb_slave":{ + "host": "", + "user": "", + "password": "", + "database": "", + "port":, + "waitForConnections": true, + "connectionLimit": 20, + "queueLimit": 0 + } + }, + "redis_options":{ + "redis1":{ + "host":"", + "port":, + "db":, + "connectTimeout": + } + }, + "node_options":{ + "node1":{ + "rpcUser":"", + "rpcPassword":"", + "rpcPort":, + "rpcHost":"" + }, + "node2":{ + "rpcUser":"", + "rpcPassword":"", + "rpcPort":, + "rpcHost":"" + } + }, + "retry_options":{ + "node":{ + "max_retries":, + "retry_delay": + } + }, + "REPORT_ADDRESS":"", + "MAX_MATURE":, + "distribution_conf":{ + "PPLNS_SIZE":, + "MODEL_PERCENT":{ + "SCORE":, + "PPLNS":, + "PROPDIF": + }, + "SCORE_PERCENT":{ + "HASHRATE":, + "STDDEVS": + }, + "POOL_FEE": + } +} \ No newline at end of file diff --git a/m2pool_backend_app/lib/mysql.py b/m2pool_backend_app/lib/mysql.py new file mode 100644 index 0000000..3a9c96d --- /dev/null +++ b/m2pool_backend_app/lib/mysql.py @@ -0,0 +1,120 @@ +import aiomysql + +class DBPool: + def __init__(self, coin, options): + self.coin = coin + self.pool = None + self.options = options + + async def _get_pool(self): + if self.pool is None: + self.pool = await aiomysql.create_pool(**self.options) + return self.pool + + # ------------------------- + # 工具:判断是否 SELECT + # ------------------------- + def _is_select(self, sql: str) -> bool: + return sql.lstrip().lower().startswith("select") + + # ------------------------- + # 非事务 SQL + # ------------------------- + async def exec(self, sql, values=None): + if values is None: + values = [] + + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, values) + + if self._is_select(sql): + # JS: SELECT 返回 rows + return await cur.fetchall() + else: + # JS: 非 SELECT 返回 OkPacket + return { + "affectedRows": cur.rowcount, + "insertId": cur.lastrowid, + } + + # ------------------------- + # 单 SQL 事务 + # ------------------------- + async def exec_transaction(self, sql, values=None): + if values is None: + values = [] + + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await conn.begin() + try: + await cur.execute(sql, values) + + if self._is_select(sql): + result = await cur.fetchall() + else: + result = { + "affectedRows": cur.rowcount, + "insertId": cur.lastrowid, + } + + await conn.commit() + return result + except Exception: + await conn.rollback() + raise + + # ------------------------- + # 多 SQL 合并事务 + # ------------------------- + async def exec_transaction_together(self, params): + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await conn.begin() + try: + for item in params: + sql = item["sql"] + param = item.get("param", []) + await cur.execute(sql, param) + await conn.commit() + except Exception: + await conn.rollback() + raise + + # ------------------------- + # 写锁事务 + # ------------------------- + async def exec_write_lock(self, sql, values=None, table_name=None): + if values is None: + values = [] + + pool = await self._get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await conn.begin() + try: + await cur.execute(f"LOCK TABLES {table_name} WRITE") + await cur.execute(sql, values) + + if self._is_select(sql): + result = await cur.fetchall() + else: + result = { + "affectedRows": cur.rowcount, + "insertId": cur.lastrowid, + } + + await conn.commit() + return result + except Exception: + await conn.rollback() + raise + finally: + # 对齐 JS:始终解锁 + await conn.cursor().execute("UNLOCK TABLES") + +__all__ = ["DBPool"] \ No newline at end of file diff --git a/m2pool_backend_app/lib/node.py b/m2pool_backend_app/lib/node.py new file mode 100644 index 0000000..cc32ab5 --- /dev/null +++ b/m2pool_backend_app/lib/node.py @@ -0,0 +1,298 @@ +import httpx + + +class BaseRPCNode: + def __init__(self, NODE_OPTION): + self.rpcUser = NODE_OPTION.get("rpcUser") + self.rpcHost = NODE_OPTION.get("rpcHost") + self.rpcPassword = NODE_OPTION.get("rpcPassword") + self.rpcPort = NODE_OPTION.get("rpcPort") + + self.base_url = f"http://{self.rpcHost}:{self.rpcPort}" + + self.client = httpx.AsyncClient( + base_url=self.base_url, + auth=(self.rpcUser, self.rpcPassword), + timeout=5.0 + ) + + async def callRpcMethod(self, method, params=None): + if params is None: + params = [] + try: + response = await self.client.post("/", json={ + "jsonrpc": "1.0", + "id": "testnet", + "method": method, + "params": params + }) + response.raise_for_status() + return response.json()["result"] + + except httpx.HTTPStatusError as e: + print("RPC Error:", e.response.text) + raise + except Exception as e: + print("RPC Error:", str(e)) + raise + + async def getblockcount(self): + return await self.callRpcMethod("getblockcount", []) + + async def getblockhash(self, height): + return await self.callRpcMethod("getblockhash", [height]) + + async def getblock(self, param): + if isinstance(param, str): + return await self.callRpcMethod("getblock", [param, 2]) + elif isinstance(param, int): + hash_ = await self.getblockhash(param) + return await self.callRpcMethod("getblock", [hash_, 2]) + else: + raise ValueError("param must be str or int") + + +# ================= NEXA ================= +class NEXARPCNode(BaseRPCNode): + + async def verify_wallet(self, address): + try: + await self.callRpcMethod("getreceivedbyaddress", [address]) + return True + except Exception: + return False + + async def getblock(self, height): + return await self.callRpcMethod("getblock", [height, 2]) + + async def verify_block(self, height, address): + block_data = await self.getblock(height) + + for item in block_data["tx"]: + if len(item["vin"]) == 0: + addresses = item["vout"][0]["scriptPubKey"]["addresses"] + if address == addresses[0]: + return block_data + else: + return False + + def block(self, block_data): + tx = block_data["tx"] + time = block_data["time"] + hash_ = block_data["hash"] + height = block_data["height"] + + block_fees = 0 + block_reward = 0 + + for item in tx: + if len(item["vin"]) == 0: + block_reward = item["sends"] + else: + block_fees += item.get("fee", 0) + + return { + "height": height, + "hash": hash_, + "time": time, + "block_reward": block_reward, + "block_fees": block_fees + } + + +# ================= GRS ================= +class GRSRPCNode(BaseRPCNode): + + async def verify_block(self, height, REPORT_ADDRESS): + block_data = await self.getblock(height) + + for item in block_data["tx"]: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + addr = value["scriptPubKey"].get("address") + if addr: + if addr == REPORT_ADDRESS: + return block_data + else: + return False + + return False + + def block(self, data): + hash_ = data["hash"] + tx = data["tx"] + height = data["height"] + time = data["time"] + + reward = 0 + fees = 0 + + for item in tx: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + reward += value["value"] + else: + fees += item.get("fee", 0) + + return { + "height": height, + "hash": hash_, + "time": time, + "block_reward": reward, + "block_fees": fees + } + + +# ================= MONA ================= +class MONARPCNode(BaseRPCNode): + + async def verify_block(self, height, REPORT_ADDRESS): + block_data = await self.getblock(height) + + for item in block_data["tx"]: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + addresses = value["scriptPubKey"].get("addresses") + if addresses: + first = addresses.get("0") if isinstance(addresses, dict) else addresses[0] + + if first == REPORT_ADDRESS: + return block_data + else: + return False + + return False + + def block(self, data): + hash_ = data["hash"] + tx = data["tx"] + height = data["height"] + time = data["time"] + + reward = 0 + + for item in tx: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + if value["scriptPubKey"].get("addresses"): + reward += value["value"] + + return { + "height": height, + "hash": hash_, + "time": time, + "block_reward": reward, + "block_fees": None + } + + +# ================= DGB ================= +class DGBRPCNode(BaseRPCNode): + + async def verify_block(self, height, REPORT_ADDRESS): + block_data = await self.getblock(height) + + for item in block_data["tx"]: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + addresses = value["scriptPubKey"].get("addresses") + if addresses and addresses[0] == REPORT_ADDRESS: + return block_data + + return False + + async def verify_block_with_algo(self, height, REPORT_ADDRESS, algorithm): + block_data = await self.getblock(height) + + if block_data.get("pow_algo") == algorithm: + for item in block_data["tx"]: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + addresses = value["scriptPubKey"].get("addresses") + if addresses and addresses[0] == REPORT_ADDRESS: + return block_data + + return False + + def block(self, data): + hash_ = data["hash"] + tx = data["tx"] + height = data["height"] + time = data["time"] + + reward = 0 + + for item in tx: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + if value["scriptPubKey"].get("addresses"): + reward += value["value"] + + return { + "height": height, + "hash": hash_, + "time": time, + "block_reward": reward, + "block_fees": None + } + + +# ================= RXD ================= +class RXDRPCNode(BaseRPCNode): + + async def verify_block(self, height, REPORT_ADDRESS): + block_data = await self.getblock(height) + + for item in block_data["tx"]: + vin = item["vin"] + vout = item["vout"] + + if vin[0].get("coinbase"): + for value in vout: + addresses = value["scriptPubKey"].get("addresses") + if addresses and addresses[0] == REPORT_ADDRESS: + hash_ = await self.getblockhash(height) + blockstats = await self.callRpcMethod("getblockstats", [hash_]) + + return { + "height": blockstats["height"], + "hash": blockstats["blockhash"], + "time": blockstats["time"], + "block_reward": blockstats["subsidy"] + blockstats["totalfee"], + "block_fees": blockstats["totalfee"] + } + + return False + + def block(self, data): + return data + + +__all__ = [ + "NEXARPCNode", + "GRSRPCNode", + "MONARPCNode", + "DGBRPCNode", + "RXDRPCNode" +] \ No newline at end of file diff --git a/m2pool_backend_app/lib/redis.py b/m2pool_backend_app/lib/redis.py new file mode 100644 index 0000000..c287829 --- /dev/null +++ b/m2pool_backend_app/lib/redis.py @@ -0,0 +1,40 @@ +import json +import redis.asyncio as redis_async + + +class Cache: + def __init__(self, options): + if isinstance(options, str): + self.redis = redis_async.from_url(options, decode_responses=True) + + elif isinstance(options, dict): + host = options.get("host", "localhost") + port = options.get("port", 6379) + db = options.get("db", 0) + password = options.get("password") + + if password: + url = f"redis://:{password}@{host}:{port}/{db}" + else: + url = f"redis://{host}:{port}/{db}" + + self.redis = redis_async.from_url(url, decode_responses=True) + + else: + raise TypeError("options must be dict or redis url string") + + async def set(self, key, value): + try: + await self.redis.set(key, value) + except Exception as err: + raise err + + async def get(self, key): + try: + data = await self.redis.get(key) + return json.loads(data) + except Exception as err: + raise err + + +__all__ = ["Cache"] \ No newline at end of file diff --git a/m2pool_backend_app/public/baseResponse.py b/m2pool_backend_app/public/baseResponse.py new file mode 100644 index 0000000..1c6713a --- /dev/null +++ b/m2pool_backend_app/public/baseResponse.py @@ -0,0 +1,29 @@ +class SuccessResponse: + def __init__(self, data=None): + self.code = 0 + self.msg = "Success" + if data: + self.data = data + + def to_dict(self): + result = {"code": self.code, "msg": self.msg} + if hasattr(self, "data"): + result["data"] = self.data + return result + + +class ErrorResponse: + def __init__(self, data=None): + self.code = -1 + self.msg = "Error" + if data: + self.data = data + + def to_dict(self): + result = {"code": self.code, "msg": self.msg} + if hasattr(self, "data"): + result["data"] = self.data + return result + + +__all__ = ["SuccessResponse", "ErrorResponse"] diff --git a/m2pool_backend_app/public/distribution-hashrate.py b/m2pool_backend_app/public/distribution-hashrate.py new file mode 100644 index 0000000..0f5de04 --- /dev/null +++ b/m2pool_backend_app/public/distribution-hashrate.py @@ -0,0 +1,11 @@ +def compute_weights(lst): + total_hashrate = sum(user["mhs24h"] for user in lst) + users_weight = [ + { + "user": user["user"], + "mhs24h": user["mhs24h"], + "weight": user["mhs24h"] / total_hashrate + } + for user in lst + ] + return users_weight \ No newline at end of file diff --git a/m2pool_backend_app/public/distribution-shares.py b/m2pool_backend_app/public/distribution-shares.py new file mode 100644 index 0000000..7992259 --- /dev/null +++ b/m2pool_backend_app/public/distribution-shares.py @@ -0,0 +1,45 @@ +from typing import List, Dict, Any +from decimal import Decimal, getcontext, ROUND_DOWN + +getcontext().prec = 50 # 高精度,避免极端值丢失 + +def truncate_to_decimals_js(number: float, dots: int) -> float: + dec = Decimal(str(number)) + # 转成字符串,避免科学计数法 + s = format(dec, "f") + if "." not in s: + return float(s) + integer_part, frac_part = s.split(".") + truncated_frac = frac_part[:dots] # 截取小数点后 dots 位 + truncated_str = f"{integer_part}.{truncated_frac}" if truncated_frac else integer_part + return float(truncated_str) + + +def calculate_shares_weight_js(data: List[Dict[str, Any]]) -> Dict[str, float]: + user_weights: Dict[str, Dict[str, Any]] = {} + for item in data: + user = item.get("user") + pool_diff = Decimal(str(item.get("pool_diff", 0))) + miner_diff = Decimal(str(item.get("miner_diff", 0))) + weight = Decimal("0") + if pool_diff != 0: + weight = miner_diff / pool_diff + if user not in user_weights: + user_weights[user] = {"totalWeight": Decimal("0"), "data": []} + user_weights[user]["totalWeight"] += weight + user_weights[user]["data"].append(item) + + total_weight = sum(u["totalWeight"] for u in user_weights.values()) + + if total_weight == 0: + return {user: 0.0 for user in user_weights.keys()} + + result: Dict[str, float] = {} + for user, v in user_weights.items(): + ratio = v["totalWeight"] / total_weight + result[user] = truncate_to_decimals_js(float(ratio), 10) + + return result + + +__all__ = ["calculate_shares_weight_js", "truncate_to_decimals_js"] \ No newline at end of file diff --git a/m2pool_backend_app/public/endian.py b/m2pool_backend_app/public/endian.py new file mode 100644 index 0000000..acd88a1 --- /dev/null +++ b/m2pool_backend_app/public/endian.py @@ -0,0 +1,11 @@ +def change_endian(hex_str: str) -> str: + # Convert hex string to bytes + buffer = bytes.fromhex(hex_str) + # Reverse the byte order + endian = buffer[::-1] + # Convert back to hex string + result = endian.hex() + return result + + +__all__ = ["change_endian"] diff --git a/m2pool_backend_app/public/index.sql b/m2pool_backend_app/public/index.sql new file mode 100644 index 0000000..64ba8dc --- /dev/null +++ b/m2pool_backend_app/public/index.sql @@ -0,0 +1,513 @@ +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS nexa_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS nexa_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS nexa_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS grs_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS grs_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS grs_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS mona_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS mona_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS mona_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS dgbs_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbs_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbs_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS dgbq_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbq_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbq_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS dgbo_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbo_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbo_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS nexa_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + +CREATE TABLE IF NOT EXISTS mona_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + +CREATE TABLE IF NOT EXISTS grs_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbq_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbo_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + +CREATE TABLE IF NOT EXISTS dgbs_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + + + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS rxd_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS rxd_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS rxd_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + + +CREATE TABLE IF NOT EXISTS rxd_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS enx_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS enx_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS enx_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + + +CREATE TABLE IF NOT EXISTS enx_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(18,8) NOT NULL, + fees DECIMAL(18,8), + state TINYINT NOT NULL +); + +-- 矿工历史算力表 +CREATE TABLE IF NOT EXISTS alph_mhsv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS alph_mhs_realv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + mhs30m DECIMAL(32, 6) NOT NULL, + mhs24h DECIMAL(32, 6) NOT NULL, + state VARCHAR(15) NOT NULL, + last_submit DATETIME NOT NULL +); + +CREATE TABLE IF NOT EXISTS alph_minersv2( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, + user VARCHAR(64) NOT NULL, + miner VARCHAR(64) NOT NULL, + date DATETIME NOT NULL, + accepts DECIMAL(16,8) NOT NULL, + state VARCHAR(10) NOT NULL, + last_submit DATETIME NOT NULL +); + + +CREATE TABLE IF NOT EXISTS alph_blkreportprofitv2( + date DATETIME NOT NULL, + height INT NOT NULL PRIMARY KEY, + hash VARCHAR(255) NOT NULL, + reward DECIMAL(32,8) NOT NULL, + fees DECIMAL(32,8), + state TINYINT NOT NULL +); + + +CREATE TABLE IF NOT EXISTS `alph_pool_blkstats` ( + `id` INT(10) NOT NULL AUTO_INCREMENT, + `date` DATETIME NOT NULL, + `height` INT(10), + `hash` VARCHAR(128), + `pow` VARCHAR(128), + `net_target` VARCHAR(128), + `submit` VARCHAR(64), + `success` TINYINT(1), + `accepts` DECIMAL(32,6), + `rejects` DECIMAL(32,6), + `reward` DECIMAL(32,6), + `fee` DECIMAL(32,6), + `nonce` VARCHAR(64), + `subidx` INT(10), + PRIMARY KEY (`id`) +); + +CREATE TABLE `alph_miners` ( + `id` INT(10) NOT NULL AUTO_INCREMENT, + `date` DATETIME NOT NULL, + `fromip` VARCHAR(64), + `state` VARCHAR(64), + `online` DATETIME, + `offline` DATETIME, + `retry` INT(10), + `duration` DECIMAL(12,6), + `protocol` VARCHAR(64), + `user` VARCHAR(128), + `miner` VARCHAR(128), + `refindex` VARCHAR(128), + `diff` DECIMAL(32,6), + `height` INT(10), + `accepts` DECIMAL(32,6), + `rejects` DECIMAL(32,6), + `ratio` DECIMAL(32,6), + `staleds` DECIMAL(32,6), + `lows` DECIMAL(32,6), + `duplicates` DECIMAL(32,6), + `formats` DECIMAL(32,6), + `others` DECIMAL(32,6), + `is_disabled` TINYINT(1), + `last_submit` DATETIME, + `submits` INT(10), + `blocks` INT(10), + `orphans` INT(10), + `orphan_ratio` DECIMAL(32,6), + PRIMARY KEY (`id`) +); + +CREATE TABLE `alph_miners_stats` ( + `id` INT(10) NOT NULL AUTO_INCREMENT, + `date` DATETIME NOT NULL, + `user` VARCHAR(128), + `miner` VARCHAR(128), + `refindex` VARCHAR(128), + `shares5m` DECIMAL(32,6), + `shares15m` DECIMAL(32,6), + `shares30m` DECIMAL(32,6), + `shares1h` DECIMAL(32,6), + `shares3h` DECIMAL(32,6), + `shares6h` DECIMAL(32,6), + `shares12h` DECIMAL(32,6), + `shares24h` DECIMAL(32,6), + `shares48h` DECIMAL(32,6), + `rejects5m` DECIMAL(32,6), + `rejects15m` DECIMAL(32,6), + `rejects30m` DECIMAL(32,6), + `rejects1h` DECIMAL(32,6), + `rejects3h` DECIMAL(32,6), + `rejects6h` DECIMAL(32,6), + `rejects12h` DECIMAL(32,6), + `rejects24h` DECIMAL(32,6), + `rejects48h` DECIMAL(32,6), + `mhs5m` DECIMAL(32,6), + `mhs15m` DECIMAL(32,6), + `mhs30m` DECIMAL(32,6), + `mhs1h` DECIMAL(32,6), + `mhs3h` DECIMAL(32,6), + `mhs6h` DECIMAL(32,6), + `mhs12h` DECIMAL(32,6), + `mhs24h` DECIMAL(32,6), + `mhs48h` DECIMAL(32,6), + `ratio5m` DECIMAL(32,6), + `ratio15m` DECIMAL(32,6), + `ratio30m` DECIMAL(32,6), + `ratio1h` DECIMAL(32,6), + `ratio3h` DECIMAL(32,6), + `ratio6h` DECIMAL(32,6), + `ratio12h` DECIMAL(32,6), + `ratio24h` DECIMAL(32,6), + `ratio48h` DECIMAL(32,6), + PRIMARY KEY (`id`) +); + +CREATE TABLE `alph_blk_height_detail` ( + `id` INT(10) NOT NULL AUTO_INCREMENT, + `date` DATETIME NOT NULL, + `from` INT(10), + `to` INT(10), + PRIMARY KEY (`id`) +); + +CREATE TABLE `alph_blk_detail` ( + `id` INT(10) NOT NULL AUTO_INCREMENT, + `date` DATETIME NOT NULL, + `height` INT(10), + `hash` VARCHAR(128), + `user` VARCHAR(128), + `miner` VARCHAR(128), + `refindex` VARCHAR(128), + `success` TINYINT(1), + `miner_diff` DECIMAL(32,6), + `pool_diff` DECIMAL(32,6), + `nonce` VARCHAR(64), + `subidx` INT(10), + PRIMARY KEY (`id`) +); + +CREATE TABLE `alph_blk_new` ( + `id` INT(10) NOT NULL AUTO_INCREMENT, + `date` DATETIME NOT NULL, + `height` INT(10), + `hash` VARCHAR(128), + `success` TINYINT(1), + `nonce` VARCHAR(64), + `subidx` INT(10), + PRIMARY KEY (`id`) +); \ No newline at end of file diff --git a/m2pool_backend_app/public/retry.py b/m2pool_backend_app/public/retry.py new file mode 100644 index 0000000..58143c5 --- /dev/null +++ b/m2pool_backend_app/public/retry.py @@ -0,0 +1,22 @@ +import asyncio +import sys + +async def execute_with_retry(task, max_retries, delay): + attempts = 0 + + while attempts < max_retries: + try: + return await task() + except Exception as error: + attempts += 1 + print(f"尝试 {attempts} 失败: {error}", file=sys.stderr) + + if attempts >= max_retries: + print("已达最大重试次数,任务失败。", file=sys.stderr) + raise + + print(f"等待 {delay} 秒后重试...") + await asyncio.sleep(delay) + + +__all__ = ["execute_with_retry"] \ No newline at end of file diff --git a/m2pool_backend_app/public/score.py b/m2pool_backend_app/public/score.py new file mode 100644 index 0000000..1427dc2 --- /dev/null +++ b/m2pool_backend_app/public/score.py @@ -0,0 +1,35 @@ +def caculate_standar_deviation(data): + def calculate_mean(values): + total = sum(float(value) for value in values) + return total / len(values) + + def calculate_standard_deviation(values, mean): + variance = sum((float(value) - mean) ** 2 for value in values) / len(values) + return variance ** 0.5 + + results = {} + for user in data: + values = data[user] + mean = calculate_mean(values) + stddev = calculate_standard_deviation(values, mean) + results[user] = stddev + + return results + + +def score(alluser_mhs24h, hash_percent=1): + hashrate_values = [obj["mhs24h"] for obj in alluser_mhs24h] + + total_hashrate = sum(hashrate_values) + + result = {} + + for item in alluser_mhs24h: + user = item["user"] + mhs24h = item["mhs24h"] + result[user] = (mhs24h / total_hashrate) * hash_percent + + return result + + +__all__ = ["caculate_standar_deviation", "score"] \ No newline at end of file diff --git a/m2pool_backend_app/public/times.py b/m2pool_backend_app/public/times.py new file mode 100644 index 0000000..afecd19 --- /dev/null +++ b/m2pool_backend_app/public/times.py @@ -0,0 +1,66 @@ +from datetime import datetime, timezone, timedelta +from email.utils import parsedate_to_datetime + + +class Times: + @staticmethod + def _js_date_parse(date_form: str) -> datetime: + if date_form == "": + raise ValueError("Invalid Date") + try: + if date_form.endswith("Z"): + return datetime.fromisoformat(date_form.replace("Z", "+00:00")) + return datetime.fromisoformat(date_form) + except Exception: + pass + try: + return datetime.strptime(date_form, "%Y-%m-%d %H:%M:%S") + except Exception: + pass + try: + return datetime.strptime(date_form, "%Y/%m/%d %H:%M:%S") + except Exception: + pass + try: + return parsedate_to_datetime(date_form) + except Exception: + pass + raise ValueError("Invalid Date") + + @staticmethod + def bj_time(date_form: str) -> str: + if date_form == "": + return "" + dt = Times._js_date_parse(date_form) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + bj_dt = dt + timedelta(hours=8) + return bj_dt.strftime("%Y-%m-%d %H:%M:%S") + + @staticmethod + def utc_time(date_form: str) -> str: + if date_form == "": + return "" + dt = Times._js_date_parse(date_form) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + utc_dt = dt.astimezone(timezone.utc) + return utc_dt.strftime("%Y-%m-%d %H:%M:%S") + + @staticmethod + def times(): + now = datetime.now() + y = now.strftime("%Y") + M = now.strftime("%m") + d = now.strftime("%d") + h = now.strftime("%H") + m = now.strftime("%M") + s = now.strftime("%S") + return [ + f"{y}-{M}-{d} {h}:{m}", + f"{y}-{M}-{d} {h}:{m}:{s}", + f"{y}-{M}-{d}", + f"{m}", + f"{h}", + f"{d}", + ] \ No newline at end of file diff --git a/m2pool_backend_app/src/balance.py b/m2pool_backend_app/src/balance.py new file mode 100644 index 0000000..099f85a --- /dev/null +++ b/m2pool_backend_app/src/balance.py @@ -0,0 +1,207 @@ +import asyncio +from datetime import datetime +import time + +# Assuming Times is a module with utcTime and bjTime functions +from public.times import Times +# Assuming Init is a base class +from init import Init + +class Balance(Init): + def __init__(self, coin): + method = "balance" + super().__init__(coin, method) + + async def query_min_height(self): + try: + sql = "SELECT MIN(max_height) AS min_height FROM wallet_in WHERE coin = ? AND state = ?;" + result = await self.distribution.exec(sql, [self.coin, 0]) + return result + except Exception as err: + raise err + + async def query_transaction_state(self, height): + try: + sql = "SELECT * FROM sendinfo WHERE coin = ? AND height >= ?;" + result = await self.balancedb.exec(sql, [self.coin, height]) + return result + except Exception as err: + raise err + + async def query_now_height(self, last_height): + try: + chain_height, db_height = await asyncio.gather( + self.node.getblockcount(), + self.query_min_height() + ) + return chain_height > db_height[0]['min_height'] + self.MAX_MATURE and chain_height > last_height + 5 + except Exception as err: + raise err + + async def update_wallet_in(self, ids): + try: + sql = "UPDATE wallet_in SET state = 1 WHERE id IN (?);" + await self.distribution.exec_transaction(sql, [ids]) + except Exception as err: + raise err + + async def insert_wallet_out_AND_update_wallet_in(self, data): + try: + sql = "INSERT INTO wallet_outv2(coin, user, address, date, max_height, tx_id, amount, tx_fee) VALUES " + values = [] + id_list = [] + for item in data: + username, qty, fee, txid, time_val, userid, height, ids = item['username'], item['qty'], item['fee'], item['txid'], item['time'], item['userid'], item['height'], item['ids'] + if txid: + id_list.extend(ids.split(",")) + values.extend([self.coin, userid, username, Times.utcTime(time_val * 1000), height, txid, qty, fee]) + sql += "(?,?,?,?,?,?,?,?), " + if not values: + print(f"{int(time.time() * 1000)}: {self.coin}无新增转账") + return + sql = sql[:-2] + wallet_in_sql = "UPDATE wallet_in SET state = 1 WHERE id IN (?);" + await asyncio.gather( + self.distribution.exec_transaction(sql, values), + self.distribution.exec_transaction(wallet_in_sql, [id_list]) + ) + except Exception as err: + raise err + + async def main(self): + try: + min_height = await self.query_min_height() + if not min_height or not min_height[0]['min_height']: + print(f"{Times.bjTime(int(time.time() * 1000))}: {self.coin}无需要更新的数据") + return True + need_update_data = await self.query_transaction_state(min_height[0]['min_height']) + if not need_update_data: + print(f"{Times.bjTime(int(time.time() * 1000))}: {self.coin}钱包暂无转账信息") + return True + await self.insert_wallet_out_AND_update_wallet_in(need_update_data) + return False + except Exception as err: + raise err + +class DGBBlance(Init): + def __init__(self, coin): + method = "balance" + super().__init__(coin, method) + + async def query_min_height(self): + try: + sql = "SELECT MIN(max_height) AS min_height FROM wallet_in WHERE coin LIKE 'dgb%' AND state = ?;" + result = await self.distribution.exec(sql, [0]) + return result + except Exception as err: + raise err + + async def check_height(self): + try: + my_sql = """ + SELECT height FROM dgbs_blkreportprofitv2 WHERE date >= "2024-11-26 00:00:00" AND date < "2024-11-27 00:00:00" + UNION + SELECT height FROM dgbo_blkreportprofitv2 WHERE date >= "2024-11-26 00:00:00" AND date < "2024-11-27 00:00:00" + UNION + SELECT height FROM dgbq_blkreportprofitv2 WHERE date >= "2024-11-26 00:00:00" AND date < "2024-11-27 00:00:00"; + """ + balance_sql = "SELECT height FROM balanceinfo WHERE height > 20402916 AND height <= 20408691;" + my_data, balance_data = await asyncio.gather( + self.distribution.exec(my_sql), + self.balancedb.exec(balance_sql) + ) + my_result = [item['height'] for item in my_data] + balance_result = [item['height'] for item in balance_data] + only_in_array1 = [item for item in my_result if item not in balance_result] + only_in_array2 = [item for item in balance_result if item not in my_result] + difference = only_in_array1 + only_in_array2 + return difference + except Exception as err: + raise err + + async def query_now_height(self, last_height): + try: + sql = "SELECT MAX(max_height) AS max_height FROM wallet_in WHERE coin LIKE 'dgb%' AND state = ?;" + chain_height, db_height = await asyncio.gather( + self.node.getblockcount(), + self.distribution.exec(sql, [0]) + ) + return chain_height > db_height[0]['max_height'] + self.MAX_MATURE and chain_height > last_height + 5 + except Exception as err: + raise err + + async def query_transaction_state(self, height): + try: + sql = "SELECT * FROM sendinfo WHERE coin LIKE 'dgb%' AND height >= ?;" + result = await self.balancedb.exec(sql, [height]) + return result + except Exception as err: + raise err + + async def query_coin(self, id_val): + try: + sql = "SELECT coin FROM wallet_in WHERE id = ?;" + result = await self.distribution.exec(sql, [id_val]) + return result + except Exception as err: + raise err + + async def update_wallet_in(self, ids): + try: + sql = "UPDATE wallet_in SET state = 1 WHERE id IN (?);" + await self.distribution.exec_transaction(sql, [ids]) + except Exception as err: + raise err + + async def insert_wallet_out_AND_update_wallet_in(self, data): + try: + sql = "INSERT INTO wallet_outv2(coin, user, address, date, max_height, tx_id, amount, tx_fee) VALUES " + values = [] + id_list = [] + for item in data: + coin, username, qty, fee, txid, time_val, userid, height, ids = item['coin'], item['username'], item['qty'], item['fee'], item['txid'], item['time'], item['userid'], item['height'], item['ids'] + if txid: + id_list.extend(ids.split(",")) + values.extend([coin, userid, username, Times.utcTime(time_val * 1000), height, txid, qty, fee]) + sql += "(?,?,?,?,?,?,?,?), " + sql = sql[:-2] + wallet_in_sql = "UPDATE wallet_in SET state = 1 WHERE id IN (?);" + await asyncio.gather( + self.distribution.exec_transaction(wallet_in_sql, [id_list]), + self.distribution.exec_transaction(sql, values) + ) + except Exception as err: + raise err + + async def main(self): + try: + min_height = await self.query_min_height() + if not min_height or not min_height[0]['min_height']: + print(f"{int(time.time() * 1000)}: dgb无需要更新的数据") + return True + need_update_data = await self.query_transaction_state(min_height[0]['min_height']) + if not need_update_data: + print(f"{int(time.time() * 1000)}: dgb转账未完成") + return True + data = [] + for item in need_update_data: + username, qty, fee, txid, time_val, userid, height, ids = item['username'], item['qty'], item['fee'], item['txid'], item['time'], item['userid'], item['height'], item['ids'] + coin = await self.query_coin(ids.split(",")[0]) + data.append({ + 'coin': coin[0]['coin'], + 'username': username, + 'qty': qty, + 'fee': fee, + 'txid': txid, + 'time': time_val, + 'userid': userid, + 'height': height, + 'ids': ids + }) + await self.insert_wallet_out_AND_update_wallet_in(data) + return False + except Exception as err: + raise err + +# Assuming module export +__all__ = ['Balance', 'DGBBlance'] \ No newline at end of file diff --git a/m2pool_backend_app/src/confirm.py b/m2pool_backend_app/src/confirm.py new file mode 100644 index 0000000..30e503b --- /dev/null +++ b/m2pool_backend_app/src/confirm.py @@ -0,0 +1,111 @@ +import asyncio +import time +from datetime import datetime + +# Assuming Init is a base class +from init import Init +# Assuming Times is a module with utcTime function +from public.times import Times + +class Confirm(Init): + def __init__(self, coin): + method = "confirm" + super().__init__(coin, method) + + def is_positive_integer(self, num): + return isinstance(num, int) and num > 0 + + # 查询当前所有满足查验要求的报块,即 <= 最高成熟高度 且 状态为待定(0)的报块 + async def query_need_update_data(self, mature_height): + try: + sql = f"SELECT height FROM {self.coin}_blkreportprofitv2 WHERE height <= ? AND state = ?;" + data = await self.distribution.exec(sql, [mature_height, 0]) + if not data or len(data) == 0: + print(f"{mature_height}高度之前暂无需要更新的报块") + return False + return [item['height'] for item in data] + except Exception as err: + raise err + + # 查询当前最高成熟高度,即当前最新高度 - MAX_MATURE + async def query_maxture_height(self): + try: + if self.coin != "alph": + now_height = await self.node.getblockcount() + max_height = int(now_height) - self.MAX_MATURE + if not self.is_positive_integer(max_height): + print(f"当前节点最大高度为{now_height}, 当前成熟高度为{max_height}") + return False + else: + return max_height + else: + now = int(time.time() * 1000) + max_mature_time = now - self.MAX_MATURE * 60 * 1000 + ymd = Times.utcTime(max_mature_time) + sql = "SELECT MAX(height) AS max_height FROM alph_blkreportprofitv2 WHERE date <= ? AND state = ?;" + data = await self.distribution.exec(sql, [ymd, 0]) + if not data[0]: + print("alph当前时间没有需要更新的成熟区块") + return False + else: + return data[0]['max_height'] + except Exception as err: + raise err + + # 通过报块地址,校验高度是否为本矿池报块 + async def verify_block(self, height): + try: + return await self.node.verify_block(height, self.REPORT_ADDRESS) + except Exception as err: + raise err + + # 根据校验通过和校验失败的高度更新blkreportprofit表,1为成功,2为失败 + async def update_blkreporprofit_state(self, suc_heights, err_heights): + try: + sql = f"UPDATE {self.coin}_blkreportprofitv2 SET state = ? WHERE height IN (?);" + if len(err_heights) == 0 and len(suc_heights) != 0: + # 只有 suc_heights 更新 + await self.distribution.exec_transaction(sql, [1, suc_heights]) + elif len(err_heights) != 0 and len(suc_heights) == 0: + # 只有 err_heights 更新 + await self.distribution.exec_transaction(sql, [2, err_heights]) + elif len(err_heights) != 0 and len(suc_heights) != 0: + # 同时更新 suc_heights 和 err_heights + await asyncio.gather( + self.distribution.exec_transaction(sql, [1, suc_heights]), + self.distribution.exec_transaction(sql, [2, err_heights]) + ) + return + except Exception as err: + raise err + + """ + 1,查验区块链最高成熟高度 + 2,查验blk表中所有<=最高成熟高度 且 状态为待定(0)的区块 + 3,遍历这些区块高度,并通过node.verify方法校验 + 4,将校验通过(正常报块)和校验不通过(孤块)分为两组 + 5,将正常报块组的状态改为1,将孤块组状态改为2 + """ + async def main(self): + try: + mature_max_height = await self.query_maxture_height() + if not mature_max_height: + return + need_update_heights = await self.query_need_update_data(mature_max_height) + if not need_update_heights: + return + suc_heights = [] + err_heights = [] + for item in need_update_heights: + verify_result = await self.verify_block(item) + if not verify_result: + err_heights.append(item) + else: + suc_heights.append(item) + await self.update_blkreporprofit_state(suc_heights, err_heights) + return + except Exception as err: + raise err + +# Assuming module export +__all__ = ['Confirm'] \ No newline at end of file diff --git a/m2pool_backend_app/src/distribution.py b/m2pool_backend_app/src/distribution.py new file mode 100644 index 0000000..9a4362c --- /dev/null +++ b/m2pool_backend_app/src/distribution.py @@ -0,0 +1,265 @@ +import asyncio +from datetime import datetime +from decimal import Decimal, ROUND_DOWN + +from public.times import Times +from init import Init + +class Distribution(Init): + def __init__(self, coin): + method = "distribution" + super().__init__(coin, method) + + # ------------------ 核心方法 ------------------ + + def score(self, alluser_mhs24h, hash_percent=1): + total_hashrate = sum(obj['mhs24h'] for obj in alluser_mhs24h) + result = {} + for item in alluser_mhs24h: + user = item['user'] + mhs24h = item['mhs24h'] + result[user] = round((mhs24h / total_hashrate) * hash_percent, 4) + return result + + async def query_last_day_hashrate(self, date): + try: + sql = f"SELECT user, SUM(mhs24h) AS mhs24h FROM {self.coin}_mhsv2 WHERE date = ? GROUP BY user;" + data = await self.hashratedb.exec(sql, [date + " 00:00:00"]) + if not data: + return False + return [{'user': item['user'], 'mhs24h': float(item['mhs24h'])} for item in data] + except Exception as err: + raise err + + async def query_last_day_reward(self, start_time, end_time): + try: + sql = f""" + SELECT MAX(height) AS max_height, SUM(reward) AS reward + FROM {self.coin}_blkreportprofitv2 + WHERE date >= ? AND date < ? AND state = ?; + """ + data = await self.distributiondb.exec(sql, [start_time, end_time, 1]) + if not data or not data[0]['reward']: + return 0 + # 返回整数,完全对齐 JS BigInt + data[0]['reward'] = int(data[0]['reward']) + return data + except Exception as err: + raise err + + async def query_last_day_if_mature(self, start_time, end_time): + try: + sql = f"SELECT count(*) AS count FROM {self.coin}_blkreportprofitv2 WHERE date >= ? AND date < ? AND state = ?;" + while True: + await asyncio.sleep(60 * 15) + data = await self.distributiondb.exec(sql, [start_time, end_time, 0]) + current_hour = int(Times.times()[4]) + if (self.coin == "rxd" and current_hour >= 9) or (self.coin != "rxd" and current_hour >= 4): + return False + if data[0]['count'] == 0: + break + return True + except Exception as err: + raise err + + async def query_users_address(self): + try: + sql = """ + SELECT + a.miner_user AS 'user', + b.balance AS 'address', + b.amount AS 'amount', + b.active AS 'state', + b.min_amount AS 'min_amount' + FROM user_account_balance b + LEFT JOIN user_miner_account a ON b.ma_id = a.id + WHERE a.coin = ? AND b.status = 0; + """ + data = await self.users_addresses.exec(sql, [self.coin]) + return data if data else False + except Exception as err: + raise err + + async def verify_block(self, height): + try: + data = await self.node.verify_block(height, self.REPORT_ADDRESS) + if data and isinstance(data, dict): + return await self.node.block(data) + return False + except Exception as err: + raise err + + async def insert_blkreportprofit(self, data): + try: + sql = f"INSERT INTO {self.coin}_blkreportprofitv2 (date, height, hash, reward, fees, state) VALUES " + values = [] + for item in data: + sql += "(?,?,?,?,?,?), " + values.extend([ + Times.utcTime(item['time'] * 1000), + item['height'], + item['hash'], + item['block_reward'], + item['block_fees'], + 1 + ]) + sql = sql[:-2] + await self.distributiondb.exec_transaction(sql, values) + except Exception as err: + raise err + + async def check_last_data_blk(self, date): + try: + ts = int(datetime.fromisoformat(date).timestamp() * 1000) - 86400000 + yMd = Times.utcTime(ts).split(" ")[0] + ymd = yMd.split("-") + table_name = f"{self.coin}_pool_blkstats_{ymd[0]}{ymd[1]}{ymd[2]}" + confirm_result = await self.pooldb.exec(f"SHOW TABLES LIKE '{table_name}';") + if not confirm_result: + print("pool_blkstats表未更新,退出本次执行,请手动校验") + return False + pool_data = await self.pooldb.exec(f"SELECT height FROM {table_name} WHERE DATE(date) >= ?;", [yMd]) + heights = [item['height'] for item in pool_data] + blkreport_data = await self.distributiondb.exec(f"SELECT height FROM {self.coin}_blkreportprofitv2 WHERE DATE(date)=? AND state=?;", [yMd, 1]) + blkreport_heights = [item['height'] for item in blkreport_data] + need_check_heights = [h for h in heights if h not in set(blkreport_heights)] + if not need_check_heights: + print(f"{self.coin}check 完成,没有需要重新校验的区块") + return True + need_insert_data = [] + for height in need_check_heights: + result = await self.verify_block(height) + if result: + need_insert_data.append(result) + if need_insert_data: + await self.insert_blkreportprofit(need_insert_data) + print(f"{self.coin}check 完成,已将{self.coin}漏掉的报块全部插入blk表中!") + else: + print(f"{self.coin}check 完成,没有需要insert的区块") + return True + except Exception as err: + raise err + + async def update_state(self, min_amount): + try: + data = await self.distributiondb.exec("SELECT user, SUM(amount) AS profit FROM wallet_in WHERE coin=? AND state=? GROUP BY user;", [self.coin, 2]) + if not data: + return + for item in data: + user = item['user'] + if item['profit'] >= min_amount[user]: + await self.distributiondb.exec("UPDATE wallet_in SET state=? WHERE coin=? AND user=?", [0, self.coin, user]) + except Exception as err: + raise err + + async def insert_wallet_in(self, data): + try: + sql = "INSERT INTO wallet_in(coin, user, address, create_date, should_out_date, max_height, amount, state) VALUES " + values = [] + for item in data: + sql += "(?,?,?,?,?,?,?,?), " + values.extend([ + item['coin'], + item['user'], + item['address'], + item['create_date'], + item['should_out_date'], + item['max_height'], + item['amount'], + item['state'] + ]) + sql = sql[:-2] + await self.distributiondb.exec_transaction(sql, values) + except Exception as err: + raise err + + async def main(self, start_time, end_time): + try: + if not await self.query_last_day_if_mature(start_time, end_time): + return + if not await self.check_last_data_blk(end_time): + return + + last_day_mhs24h, last_day_reward, users_address = await asyncio.gather( + self.query_last_day_hashrate(end_time), + self.query_last_day_reward(start_time, end_time), + self.query_users_address() + ) + if not last_day_mhs24h or not last_day_reward or not users_address: + print("查询错误", last_day_mhs24h, last_day_reward, users_address) + return + + reward = (int(last_day_reward[0]['reward']) * int((1 - self.POOL_FEE) * 10000)) // 10000 + score_ratio = self.score(last_day_mhs24h, 1) + max_height = last_day_reward[0]['max_height'] + + should_out_date = end_time + accuracy = 100000000 + count = 8 + if self.coin == "nexa": + should_out_date = Times.utcTime(int(datetime.fromisoformat(end_time).timestamp() * 1000) + 1000 * 60 * 60 * 24 * 7) + accuracy = 100 + count = 2 + elif self.coin == "rxd": + accuracy = 100 + count = 2 + elif self.coin == "alph": + accuracy = 0 + count = 0 + + user_profit = 0 + result = [] + pool_account_address = None + min_amount = {} + + for user, ratio in score_ratio.items(): + ratio_int = round(ratio * 10000) + profit = reward * ratio_int // 10000 + if profit == 0: + continue + user_profit += profit + for item in users_address: + if item['user'] == "pool_account": + pool_account_address = item['address'] + if item['user'] == user: + min_amount[user] = item['min_amount'] + state = 0 + if profit >= item['amount'] and item['state'] == 0: + state = 0 + elif profit < item['amount'] and item['state'] == 0: + state = 2 + elif profit >= item['amount'] and item['state'] == 1: + state = 3 + else: + state = 4 + result.append({ + 'coin': self.coin, + 'user': user, + 'address': item['address'], + 'create_date': end_time, + 'should_out_date': should_out_date, + 'max_height': max_height, + 'amount': profit, + 'state': state + }) + + pool_account_amount = Decimal(last_day_reward[0]['reward']) - Decimal(user_profit) + pool_account_amount = pool_account_amount.quantize(Decimal(f'1e-{count}'), rounding=ROUND_DOWN) + result.append({ + 'coin': self.coin, + 'user': "pool_account", + 'address': pool_account_address, + 'create_date': end_time, + 'should_out_date': should_out_date, + 'max_height': max_height, + 'amount': float(pool_account_amount), + 'state': 0 + }) + + print(result) + await self.insert_wallet_in(result) + await self.update_state(min_amount) + except Exception as err: + raise err + +__all__ = ['Distribution'] \ No newline at end of file diff --git a/m2pool_backend_app/src/hashrate.py b/m2pool_backend_app/src/hashrate.py new file mode 100644 index 0000000..4af4a5f --- /dev/null +++ b/m2pool_backend_app/src/hashrate.py @@ -0,0 +1,280 @@ +import asyncio +from datetime import datetime +from decimal import Decimal +import time + +from public.times import Times +from init import Init + +class HashRate(Init): + def __init__(self, coin): + method = "hashrate" + super().__init__(coin, method) + self.diff_one_share_hashs_avg = 2 ** 32 - 1 + self.count = 0 + + def calculate_hashrate(self, accepts, seconds, unit): + num_map = { + "H/s": 1, + "KH/s": 1_000, + "MH/s": 1_000_000, + "GH/s": 1_000_000_000, + "TH/s": 1_000_000_000_000, + "PH/s": 1_000_000_000_000_000, + "EH/s": 10 ** 18 + } + if unit not in num_map: + raise ValueError(f"{unit}不是已知单位") + num = Decimal(num_map[unit]) + hashrate = (Decimal(accepts) * Decimal(self.diff_one_share_hashs_avg)) / Decimal(seconds) / num + if self.coin == "alph": + return hashrate * 4 + return hashrate + + def merge(self, data): + results = {} + for item in data: + key = f"{item['user']}-{item['miner']}" + if key in results: + existing = results[key] + existing['accepts'] += float(item['accepts']) + if datetime.fromisoformat(item['last_submit']) > datetime.fromisoformat(existing['last_submit']): + existing['last_submit'] = item['last_submit'] + results[key] = existing + else: + results[key] = { + 'user': item['user'], + 'miner': item['miner'], + 'accepts': float(item['accepts']), + 'last_submit': item['last_submit'] + } + return list(results.values()) + + async def query_table(self, start_time, end_time): + try: + sql = f"(SELECT date, `from`, `to` FROM {self.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) " \ + f"UNION (SELECT date, `from`, `to` FROM {self.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;" + data = await self.sharesdb.exec(sql, [end_time, start_time, end_time]) + result = [f"{self.coin}_block_detail_{item['from']}_{int(item['to'] - 1)}" for item in data] if data else [] + result.append(f"{self.coin}_blk_detail") + return result + except Exception as err: + print(err) + return [] + + async def query_slave_table(self, start_time, end_time): + try: + sql = f"(SELECT date, `from`, `to` FROM {self.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) " \ + f"UNION (SELECT date, `from`, `to` FROM {self.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;" + data = await self.sharesdb_slave.exec(sql, [end_time, start_time, end_time]) + result = [f"{self.coin}_block_detail_{item['from']}_{int(item['to'] - 1)}" for item in data] if data else [] + result.append(f"{self.coin}_blk_detail") + return result + except Exception as err: + print(err) + return [] + + async def query_accepts(self, start_time, end_time, enable): + try: + if enable: + tables_name, slave_tables_name = await asyncio.gather( + self.query_table(start_time, end_time), + self.query_slave_table(start_time, end_time) + ) + + def build_sql(table_list): + if len(table_list) <= 1: + return f'SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts ' \ + f'FROM {self.coin}_blk_detail WHERE date >= "{start_time}" AND date < "{end_time}" GROUP BY user, miner;' + sql = 'SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( ' + for i, tbl in enumerate(table_list): + if i < len(table_list) - 1: + sql += f'SELECT date, user, miner, miner_diff, pool_diff FROM {tbl} WHERE date >= "{start_time}" AND date < "{end_time}" \nUNION ALL\n' + else: + sql += f'SELECT date, user, miner, miner_diff, pool_diff FROM {tbl} WHERE date >= "{start_time}" AND date < "{end_time}") AS combined_tables GROUP BY user, miner;' + return sql + + sql = build_sql(tables_name) + slave_sql = build_sql(slave_tables_name) + + accepts_data, slave_accepts = await asyncio.gather( + self.sharesdb.exec(sql), + self.sharesdb_slave.exec(slave_sql) + ) + return self.merge(accepts_data + slave_accepts) + else: + tables_name = await self.query_table(start_time, end_time) + sql = 'SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( ' + for i, tbl in enumerate(tables_name): + if i < len(tables_name) - 1: + sql += f'SELECT date, user, miner, miner_diff, pool_diff FROM {tbl} WHERE date >= "{start_time}" AND date < "{end_time}" \nUNION ALL\n' + else: + sql += f'SELECT date, user, miner, miner_diff, pool_diff FROM {tbl} WHERE date >= "{start_time}" AND date < "{end_time}") AS combined_tables GROUP BY user, miner;' + accepts_data = await self.sharesdb.exec(sql) + return self.merge(accepts_data) + except Exception as err: + print(f"Retry {self.count} for query_accepts due to {err}") + await asyncio.sleep(15) + if self.count > 3: + self.count = 0 + raise err + self.count += 1 + return await self.query_accepts(start_time, end_time, enable) + + async def query_miners(self, time_str): + try: + sql = f"SELECT date, user, miner, state, ratio, last_submit FROM {self.coin}_miners WHERE last_submit >= DATE_SUB(?, INTERVAL 1 DAY);" + return await self.pooldb.exec(sql, [time_str]) + except Exception as err: + raise err + + async def insert_mhs(self, data): + if not data: + print(int(time.time() * 1000), ":30分钟没有新增矿机提交数据") + return + sql = f"INSERT INTO {self.coin}_mhsv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES " + values = [] + for item in data: + sql += "(?, ?, ?, ?, ?, ?, ?), " + values.extend([item['user'], item['miner'], item['date'], item['mhs30m'], item['mhs24h'], item['state'], item['last_submit']]) + sql = sql[:-2] + await self.hashratedb.exec_transaction(sql, values) + + async def insert_mhs_real(self, data): + if not data: + print(int(time.time() * 1000), ":5分钟没有新增矿机提交数据") + return + del_sql = f"DELETE FROM {self.coin}_mhs_realv2 WHERE id > 0;" + sql = f"INSERT INTO {self.coin}_mhs_realv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES " + values = [] + for item in data: + sql += "(?, ?, ?, ?, ?, ?, ?), " + values.extend([item['user'], item['miner'], item['date'], item['mhs30m'], item['mhs24h'], item['state'], item['last_submit']]) + sql = sql[:-2] + sqls = [{"sql": del_sql}, {"sql": sql, "param": values}] + await self.hashratedb.exec_transaction_together(sqls) + + async def query_hashrate_miners_accepts(self, end_time): + ymd_last_30m = Times.utcTime(int(datetime.fromisoformat(end_time).timestamp() * 1000) - 1000 * 60 * 30) + ymd_last_24h = Times.utcTime(int(datetime.fromisoformat(end_time).timestamp() * 1000) - 1000 * 60 * 60 * 24) + state_sql = f"""SELECT t1.* +FROM {self.coin}_minersv2 t1 +INNER JOIN ( + SELECT user, miner, MAX(date) AS max_date + FROM {self.coin}_minersv2 + WHERE date <= ? + GROUP BY user, miner +) t2 +ON t1.user = t2.user AND t1.miner = t2.miner AND t1.date = t2.max_date;""" + mhs30m_sql = f"SELECT SUM(accepts) AS accepts_30min, user, miner FROM {self.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;" + mhs24h_sql = f"SELECT SUM(accepts) AS accepts_24h, user, miner FROM {self.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;" + + state, mhs30m, mhs24h = await asyncio.gather( + self.hashratedb.exec(state_sql, [end_time]), + self.hashratedb.exec(mhs30m_sql, [ymd_last_30m, end_time]), + self.hashratedb.exec(mhs24h_sql, [ymd_last_24h, end_time]) + ) + + hashrate_map = {} + for item in state: + hashrate_map[f"{item['user']}:{item['miner']}"] = { + 'date': end_time, + 'user': item['user'], + 'miner': item['miner'], + 'state': item['state'], + 'last_submit': item['last_submit'], + 'mhs30m': 0, + 'mhs24h': 0 + } + for item in mhs30m: + key = f"{item['user']}:{item['miner']}" + if key in hashrate_map: + hashrate_map[key]['mhs30m'] = self.calculate_hashrate(item['accepts_30min'], 60*30, "MH/s") + for item in mhs24h: + key = f"{item['user']}:{item['miner']}" + if key in hashrate_map: + hashrate_map[key]['mhs24h'] = self.calculate_hashrate(item['accepts_24h'], 60*60*24, "MH/s") + return hashrate_map + + async def insert_hashrate_miners_table(self, end_time): + ymd = end_time.split(":") + date = f"{ymd[0]}:{ymd[1]}:00" + start_time = Times.utcTime(int(datetime.fromisoformat(end_time).timestamp() * 1000) - 1000*60*5) + enable = (await self.redis.get(f"{self.coin}:enable")) or False + + accepts, miners_state = await asyncio.gather( + self.query_accepts(start_time, end_time, enable), + self.query_miners(end_time) + ) + + miners_map = {} + if not accepts and not miners_state: + return + elif accepts and not miners_state: + return + elif not accepts and miners_state: + for item in miners_state: + miners_map[f"{item['user']}:{item['miner']}"] = { + 'date': date, + 'user': item['user'], + 'miner': item['miner'], + 'accepts': 0, + 'state': "offline", + 'last_submit': item['last_submit'] + } + else: + for item in accepts: + miners_map[f"{item['user']}:{item['miner']}"] = { + 'date': date, + 'user': item['user'], + 'miner': item['miner'], + 'accepts': item['accepts'], + 'last_submit': item['last_submit'], + 'state': "online" + } + for item in miners_state: + key = f"{item['user']}:{item['miner']}" + if key not in miners_map: + miners_map[key] = { + 'date': date, + 'user': item['user'], + 'miner': item['miner'], + 'accepts': 0, + 'state': item['state'], + 'last_submit': item['last_submit'] + } + + insert_sql = f"INSERT INTO {self.coin}_minersv2(user, miner, date, accepts, state, last_submit) VALUES " + values = [] + for item in miners_map.values(): + insert_sql += "(?, ?, ?, ?, ?, ?), " + values.extend([item['user'], item['miner'], item['date'], item['accepts'], item['state'], item['last_submit']]) + insert_sql = insert_sql[:-2] + await self.hashratedb.exec_transaction(insert_sql, values) + + +class HashRateNew(Init): + def __init__(self, coin): + method = "distribution" + super().__init__(coin, method) + self.count = 0 + + async def query_blk_detail_table_name(self): + try: + sql = f"SELECT `from`, `to` FROM {self.coin}_blk_height_detail WHERE date < NOW() - INTERVAL 5 MINUTE;" + data = await self.sharesdb.exec(sql) + tables = [f"{self.coin}_blk_detail"] + if data: + for item in data: + tables.append(f"{self.coin}_block_detail_{item['from']}_{int(item['to'] - 1)}") + return tables + except Exception as err: + raise err + + async def query_blk_detail(self): + try: + pass + except Exception as err: + raise err + +__all__ = ['HashRate', 'HashRateNew'] \ No newline at end of file diff --git a/m2pool_backend_app/src/init.py b/m2pool_backend_app/src/init.py new file mode 100644 index 0000000..792b853 --- /dev/null +++ b/m2pool_backend_app/src/init.py @@ -0,0 +1,112 @@ +import asyncio +import json +import os + +# 假设这些是 Python 等效库 +from lib.mysql import DBPool +from lib.redis import Cache +from lib.node import NEXARPCNode, GRSRPCNode, MONARPCNode, DGBRPCNode, RXDRPCNode, ENXNode, ALPHRPCNode + +class Init: + def __init__(self, coin, method): + self.coin = coin + self.method = method + # 配置加载 + config_path = f"./config/{coin}.conf" + with open(config_path, "r", encoding="utf-8") as f: + config = json.load(f) + + self.master = config["master"] + self.slave = config["slave"] + self.redis_options = config["redis_options"] + self.node_options = config["node_options"] + self.distribution_conf = config["distribution_conf"] + self.MAX_MATURE = config["MAX_MATURE"] + self.REPORT_ADDRESS = config["REPORT_ADDRESS"] + + # 节点映射 + self.node_map = { + "mona": MONARPCNode, + "nexa": NEXARPCNode, + "grs": GRSRPCNode, + "dgbs": DGBRPCNode, + "dgbq": DGBRPCNode, + "dgbo": DGBRPCNode, + "rxd": RXDRPCNode, + "enx": ENXNode, + "alph": ALPHRPCNode + } + + # 初始化分支 + self._init_method(method) + + def _init_method(self, method): + master = self.master + slave = self.slave + redis1 = self.redis_options["redis1"] + node2 = self.node_options["node2"] + POOL_FEE = self.distribution_conf.get("POOL_FEE", 0) + + if method == "hashrate": + self.sharesdb = DBPool(self.coin, master["sharesdb"]) + self.sharesdb_slave = DBPool(self.coin, slave["sharesdb_slave"]) + self.pooldb = DBPool(self.coin, master["pooldb"]) + self.redis = Cache(redis1) + self.hashratedb = DBPool(self.coin, master["hashrate"]) + + elif method == "distribution": + self.pooldb = DBPool(self.coin, master["pooldb"]) + self.pooldb_slave = DBPool(self.coin, slave["pooldb_slave"]) + self.hashratedb = DBPool(self.coin, master["hashrate"]) + self.distributiondb = DBPool(self.coin, master["distribution"]) + self.users_addresses = DBPool(self.coin, master["users_addresses"]) + self.node = self.node_map[self.coin](node2) + self.REPORT_ADDRESS = self.REPORT_ADDRESS + self.POOL_FEE = POOL_FEE + print(f"当前手续费率为: {POOL_FEE}") + + elif method == "confirm": + self.node = self.node_map[self.coin](node2) + self.distribution = DBPool(self.coin, master["distribution"]) + self.pooldb = DBPool(self.coin, master["pooldb"]) + self.MAX_MATURE = self.MAX_MATURE + self.REPORT_ADDRESS = self.REPORT_ADDRESS + + elif method == "balance": + self.distribution = DBPool(self.coin, master["distribution"]) + self.balancedb = DBPool(self.coin, master["balance"]) + self.node = self.node_map[self.coin](node2) + self.MAX_MATURE = self.MAX_MATURE + + elif method == "report": + if self.coin == "enx": + self.distribution = DBPool(self.coin, master["distribution"]) + self.pooldb = DBPool(self.coin, master["pooldb"]) + else: + self.node = self.node_map[self.coin](node2) + self.distribution = DBPool(self.coin, master["distribution"]) + self.redis = Cache(redis1) + self.REPORT_ADDRESS = self.REPORT_ADDRESS + + elif method == "clear": + self.pooldb = DBPool(self.coin, master["pooldb"]) + self.sharesdb = DBPool(self.coin, master["sharesdb"]) + self.hashratedb = DBPool(self.coin, master["hashrate"]) + + elif method == "stats": + self.pooldb = DBPool(self.coin, master["pooldb"]) + self.hashratedb = DBPool(self.coin, master["hashrate"]) + self.distribution = DBPool(self.coin, master["distribution"]) + + elif method == "notice": + self.distribution = DBPool(self.coin, master["distribution"]) + + else: + raise ValueError(f"暂不支持 {method} 方法 init") + + async def sleep(self, ms): + """异步睡眠""" + await asyncio.sleep(ms / 1000) + +# 模块导出 +__all__ = ['Init'] \ No newline at end of file diff --git a/m2pool_backend_app/src/report.py b/m2pool_backend_app/src/report.py new file mode 100644 index 0000000..ee45679 --- /dev/null +++ b/m2pool_backend_app/src/report.py @@ -0,0 +1,167 @@ +import asyncio +import inspect +from datetime import datetime + +from public.times import Times +from init import Init + + +class Report(Init): + def __init__(self, coin): + super().__init__(coin, "report") + + async def query_mysql_last_height(self): + sql = f"SELECT MAX(height) AS max_height FROM {self.coin}_blkreportprofitv2;" + data = await self.distribution.exec(sql) + if not data or data[0]["max_height"] is None: + raise ValueError(f"{self.coin}当前无报块记录") + return data[0]["max_height"] + + async def query_redis_last_height(self): + data = await self.redis.get(f"{self.coin}:last_check") + if not data: + result = await self.query_mysql_last_height() + await self.redis.set(f"{self.coin}:last_check", result) + print(f"redis中无{self.coin} last_check数据,采用最后一个报块高度!") + return result + return int(data) + + async def query_chain_last_height(self): + return int(await self.node.getblockcount()) + + async def insert_blkreportprofit(self, data): + sql = f""" + INSERT INTO {self.coin}_blkreportprofitv2 + (date, height, hash, reward, fees, state) + VALUES + """ + values = [] + for item in data: + sql += "(?,?,?,?,?,?)," + values.extend([ + item["date"], + item["height"], + item["hash"], + item["reward"], + item["fees"], + 0 + ]) + sql = sql.rstrip(",") + await self.distribution.exec_transaction(sql, values) + + async def main(self): + redis_height, chain_height = await asyncio.gather( + self.query_redis_last_height(), + self.query_chain_last_height() + ) + + if chain_height < redis_height: + print(f"{self.coin}节点同步出错,节点高度{chain_height},last_check{redis_height}") + return + + if chain_height == redis_height: + print(f"{self.coin}当前节点和last_check高度一致,无需校验") + return + + suc_data = [] + + for height in range(redis_height + 1, chain_height + 1): + check_result = await self.node.verify_block(height, self.REPORT_ADDRESS) + if not check_result: + continue + + block_ret = self.node.block(check_result) + if inspect.isawaitable(block_ret): + block = await block_ret + else: + block = block_ret + + suc_data.append({ + "date": Times.utcTime(block["time"] * 1000), + "height": block["height"], + "hash": block["hash"], + "reward": str(block["block_reward"]), + "fees": block["block_fees"] + }) + + if not suc_data: + print(f"{redis_height} - {chain_height} 无报块") + await self.redis.set(f"{self.coin}:last_check", chain_height) + return + + await self.insert_blkreportprofit(suc_data) + await self.redis.set(f"{self.coin}:last_check", chain_height) + + +class ReportEnx(Report): + async def query_blkstats(self, height): + yesterday = Times.utcTime( + int(datetime.now().timestamp() * 1000) - 86400000 + ).split(" ")[0].replace("-", "") + + table1 = "enx_pool_blkstats" + table2 = f"enx_pool_blkstats_{yesterday}" + + exist_sql = "SHOW TABLES LIKE %s;" + exist = await self.pooldb.exec(exist_sql, [table2]) + + if not exist: + sql = f"SELECT date, height, hash FROM {table1} WHERE height > %s;" + return await self.pooldb.exec(sql, [height]) + + sql = f""" + SELECT date, height, hash FROM {table1} WHERE height > %s + UNION ALL + SELECT date, height, hash FROM {table2} WHERE height > %s; + """ + return await self.pooldb.exec(sql, [height, height]) + + async def query_blkreportprofit(self): + sql = f"SELECT MAX(height) AS max_height FROM {self.coin}_blkreportprofitv2;" + data = await self.distribution.exec(sql) + if not data or data[0]["max_height"] is None: + return 0 + return data[0]["max_height"] + + async def insertinto_blkreportprofit(self, data): + sql = f""" + INSERT INTO {self.coin}_blkreportprofitv2 + (date, height, hash, reward, fees, state) + VALUES + """ + values = [] + for item in data: + sql += "(?,?,?,?,?,?)," + values.extend([ + item["date"], + item["height"], + item["hash"], + item["reward"], + 0, + 1 + ]) + sql = sql.rstrip(",") + await self.distribution.exec_transaction(sql, values) + + async def main(self): + last_height = await self.query_blkreportprofit() + data = await self.query_blkstats(last_height) + + if not data: + print(f"{self.coin} 无报块") + return + + block_data = [ + { + "date": item["date"], + "height": item["height"], + "hash": item["hash"], + "reward": 333 + } + for item in data + ] + + await self.insertinto_blkreportprofit(block_data) + + +__all__ = ["Report", "ReportEnx"] \ No newline at end of file