m2pool_backend_app/test/test1.js

463 lines
18 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const Times = require("../public/times");
const Decimal = require("decimal");
const fs = require("fs");
const DBPool = require("../lib/mysql");
const Cache = require("../lib/redis");
const { NEXARPCNode, GRSRPCNode, MONARPCNode, DGBRPCNode, RXDRPCNode } = require("../lib/node");
class Init {
constructor(coin, method) {
this.coin = coin;
const config = fs.readFileSync(`../config/${coin}.conf`, "utf-8");
const { master, slave, redis_options, node_options, distribution_conf, MAX_MATURE, REPORT_ADDRESS } = JSON.parse(config);
const { pooldb, sharesdb, distribution, hashrate, users_addresses, balance } = master;
const { pooldb_slave, sharesdb_slave } = slave;
const { node1, node2 } = node_options;
const { redis1 } = redis_options;
const { POOL_FEE } = distribution_conf;
const node_map = {
mona: MONARPCNode,
nexa: NEXARPCNode,
grs: GRSRPCNode,
dgbs: DGBRPCNode,
dgbq: DGBRPCNode,
dgbo: DGBRPCNode,
rxd: RXDRPCNode,
};
switch (method) {
case "hashrate":
this.sharesdb = new DBPool(coin, sharesdb);
this.sharesdb_slave = new DBPool(coin, sharesdb_slave);
this.pooldb = new DBPool(coin, pooldb);
this.redis = new Cache(redis1);
// this.pooldb_slave = new DBPool(coin, pooldb_slave)
this.hashratedb = new DBPool(coin, hashrate);
break;
case "report":
this.REPORT_ADDRESS = REPORT_ADDRESS;
this.node = new node_map[coin](node2);
this.distribution = new DBPool(coin, distribution);
this.redis = new Cache(redis1);
break;
case "clear":
this.pooldb = new DBPool(coin, pooldb);
this.sharesdb = new DBPool(coin, sharesdb);
this.hashratedb = new DBPool(coin, hashrate);
break;
case "distribution":
this.pooldb = new DBPool(coin, pooldb);
this.hashratedb = new DBPool(coin, hashrate);
this.distributiondb = new DBPool(coin, distribution);
this.users_addresses = new DBPool(coin, users_addresses);
this.node = new node_map[coin](node2);
this.REPORT_ADDRESS = REPORT_ADDRESS;
this.POOL_FEE = POOL_FEE;
console.log(`当前手续费率为:${POOL_FEE}`);
// this.balance = new DBPool(coin, balance)
break;
case "balance":
this.distribution = new DBPool(coin, distribution);
this.balancedb = new DBPool(coin, balance);
break;
case "confirm":
this.MAX_MATURE = MAX_MATURE;
this.REPORT_ADDRESS = REPORT_ADDRESS;
this.node = new node_map[coin](node2);
this.distribution = new DBPool(coin, distribution);
this.pooldb = new DBPool(coin, pooldb);
break;
case "stats":
this.pooldb = new DBPool(coin, pooldb);
this.hashratedb = new DBPool(coin, hashrate);
this.distribution = new DBPool(coin, distribution);
break;
default:
throw `暂不支持${method}方法 init`;
}
}
sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
class HashRate extends Init {
constructor(coin) {
const method = "hashrate";
super(coin, method);
this.diffOneShareHashsAvg = 2 ** 32 - 1;
}
/**
* 计算hash
* @param {Number} accepts 时段内接受总数
* @param {Number} seconds 时段秒数
* @param {String} unit H/s、KH/s、MH/s、GH/s、TH/s、PH/s、EH/s
* @returns
*/
calculate_hashrate(accepts, seconds, unit) {
let num;
switch (unit) {
case "H/s":
num = 1;
break;
case "KH/s":
num = 1_000;
break;
case "MH/s":
num = 1_000_000;
break;
case "GH/s":
num = 1_000_000_000;
break;
case "TH/s":
num = 1_000_000_000_000;
break;
case "PH/s":
num = 1_000_000_000_000_000;
break;
case "EH/s":
num = 10 ** 18;
break;
default:
throw `${unit}不是已知单位`;
}
const hashrate = (accepts * this.diffOneShareHashsAvg) / seconds / num;
return hashrate;
}
/**
* 将主、备查询出来的数据合并
* @param {*} data [{user:"", miner:"", accepts:100},{user:"", miner:"", accepts:100}...]
* @returns
*/
merge(data) {
// 创建一个 Map 来存储 user 和 miner 组合的结果
const results = new Map();
data.forEach((item) => {
const key = `${item.user}-${item.miner}`;
if (results.has(key)) {
const existing = results.get(key);
existing.accepts += parseFloat(item.accepts);
if (new Date(item.last_submit) > new Date(existing.last_submit)) {
existing.last_submit = item.last_submit;
}
results.set(key, existing);
} else {
results.set(key, {
user: item.user,
miner: item.miner,
accepts: parseFloat(item.accepts),
last_submit: item.last_submit,
});
}
});
// 将结果转换为数组
const resultArray = Array.from(results.values());
return resultArray;
}
/**
* 查询主库符合时段的表名
* @param {String} start_time
* @param {String} end_time
* @returns
*/
async query_table(start_time, end_time) {
try {
const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`;
const data = await this.sharesdb.exec(sql, [end_time, start_time, end_time])
const result = [];
if (data.length !== 0) {
for (let item of data) {
result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`);
}
}
result.push(`${this.coin}_blk_detail`);
return result;
} catch (err) {
console.log(err);
return [];
}
}
/**
* 查询从库符合时段的表名
* @param {String} start_time
* @param {String} end_time
* @returns
*/
async query_slave_table(start_time, end_time) {
try {
const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`;
const data = await this.sharesdb_slave.exec(sql, [end_time, start_time, end_time])
const result = [];
if (data.length !== 0) {
for (let item of data) {
result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`);
}
}
result.push(`${this.coin}_blk_detail`);
return result;
} catch (err) {
console.log(err);
return [];
}
}
// 查询时段内accepts主从同时查询
async query_accepts(start_time, end_time, enable) {
try {
if (this.count === undefined) this.count = 0;
if (enable) {
const [tables_name, slave_tables_name] = await Promise.all([this.query_table(start_time, end_time), this.query_slave_table(start_time, end_time)]);
// 查询主库符合条件的数据
let sql = ``;
if (tables_name.length <= 1) {
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`;
} else {
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `;
for (let i = 0; i < tables_name.length; i++) {
if (i < tables_name.length - 1) {
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`;
} else {
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`;
}
}
}
let slave_sql = ``;
if (slave_tables_name.length <= 1) {
slave_sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`;
} else {
slave_sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `;
for (let i = 0; i < slave_tables_name.length; i++) {
if (i < slave_tables_name.length - 1) {
slave_sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${slave_tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`;
} else {
slave_sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${slave_tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`;
}
}
}
// // 执行查询,并将结果合并
const [accepts_data, slave_accepts] = await Promise.all([this.sharesdb.exec(sql), this.sharesdb_slave.exec(slave_sql)]);
const accepts = this.merge(accepts_data.concat(slave_accepts)); // 合并主备accepts
return accepts;
} else {
const tables_name = await this.query_table(start_time, end_time);
let sql = ``;
if (tables_name.length <= 1) {
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`;
} else {
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `;
for (let i = 0; i < tables_name.length; i++) {
if (i < tables_name.length - 1) {
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`;
} else {
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`;
}
}
}
const accepts_data = await this.sharesdb.exec(sql);
const slave_accepts = [];
const accepts = this.merge(accepts_data.concat(slave_accepts)); // 合并主备accepts
return accepts;
}
} catch (err) {
console.error(`Error in query_accepts: ${err.message}`);
await this.sleep(1000 * 15);
if (this.count > 3) { // 重试4次1分钟
this.count = 0;
throw err;
}
this.count++;
return this.query_accepts(start_time, end_time, enable);
}
}
// 查询当天miners状态排除掉超过1天没有提交的矿工
async query_miners(time) {
try {
const sql = `SELECT date, user, miner, state, ratio, last_submit FROM ${this.coin}_miners WHERE last_submit >= DATE(?) - INTERVAL 1 DAY;`;
const miners_state = await this.pooldb.exec(sql, [time]);
return miners_state;
} catch (err) {
throw err;
}
}
async insert_mhs(data) {
if (data.length === 0 || !data || data.size === 0) {
console.log(Date.now(), "30分钟没有新增矿机提交数据");
return;
}
try {
let sql = `INSERT INTO ${this.coin}_mhsv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `;
const values = [];
data.forEach((item) => {
const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item;
sql += `(?, ?, ?, ?, ?, ?, ?), `;
values.push(user, miner, date, mhs30m, mhs24h, state, last_submit);
});
sql = sql.slice(0, -2);
await this.hashratedb.exec_transaction(sql, values);
} catch (err) {
throw err;
}
}
async insert_mhs_real(data) {
if (data.length === 0 || !data || data.size === 0) {
console.log(Date.now(), "5分钟没有新增矿机提交数据");
return;
}
try {
const del_sql = `DELETE FROM ${this.coin}_mhs_realv2 WHERE id > 0;`;
let sql = `INSERT INTO ${this.coin}_mhs_realv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `;
const values = [];
data.forEach((item) => {
const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item;
sql += `(?, ?, ?, ?, ?, ? ,?), `;
values.push(user, miner, date, mhs30m, mhs24h, state, last_submit);
});
sql = sql.slice(0, -2);
// sql += ` AS new_values ON DUPLICATE KEY UPDATE date = new_values.date, mhs30m = new_values.mhs30m, mhs24h = new_values.mhs24h, state = new_values.state, last_submit = new_values.last_submit;`;
const sqls = [{ sql: del_sql }, { sql, param: values }];
await this.hashratedb.exec_transaction_together(sqls);
} catch (err) {
// 处理错误
console.error("Transaction failed: ", err);
throw err;
}
}
async query_hashrate_miners_accepts(end_time) {
try {
const ymd_last_30m = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 30);
const ymd_last_24h = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 60 * 24);
const state_sql = `SELECT t1.*
FROM ${this.coin}_minersv2 t1
INNER JOIN (
SELECT user, miner, MAX(date) AS max_date
FROM ${this.coin}_minersv2
WHERE date <= ?
GROUP BY user, miner
) t2
ON t1.user = t2.user AND t1.miner = t2.miner AND t1.date = t2.max_date;`;
const mhs30m_sql = `SELECT SUM(accepts) AS accepts_30min, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`;
const mhs24h_sql = `SELECT SUM(accepts) AS accepts_24h, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`;
const [state, mhs30m, mhs24h] = await Promise.all([this.hashratedb.exec(state_sql, [end_time]), this.hashratedb.exec(mhs30m_sql, [ymd_last_30m, end_time]), this.hashratedb.exec(mhs24h_sql, [ymd_last_24h, end_time])]);
const hashrate_map = new Map();
state.forEach((item) => {
const { date, user, miner, state, last_submit } = item;
hashrate_map.set(`${user}:${miner}`, { date: end_time, user, miner, state, last_submit, mhs30m: 0, mhs24h: 0 });
});
mhs30m.forEach((item) => {
const { accepts_30min, user, miner } = item;
const values = hashrate_map.get(`${user}:${miner}`);
values.mhs30m = this.calculate_hashrate(accepts_30min, 60 * 30, "MH/s");
hashrate_map.set(`${user}:${miner}`, values);
});
mhs24h.forEach((item) => {
const { accepts_24h, user, miner } = item;
const values = hashrate_map.get(`${user}:${miner}`);
values.mhs24h = this.calculate_hashrate(accepts_24h, 60 * 60 * 24, "MH/s");
hashrate_map.set(`${user}:${miner}`, values);
});
return hashrate_map;
} catch (err) {
throw err;
}
}
async insert_hashrate_miners_table(end_time) {
try {
const ymd = end_time.split(":");
const date = ymd[0] + ":" + ymd[1] + ":" + "00";
// 计算最近5分钟accepts最新矿机状态
const start_time = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 5);
let enable = (await this.redis.get(`${this.coin}:enable`)) || false;
let [accepts, miners_state] = await Promise.all([this.query_accepts(start_time, end_time, enable), this.query_miners(end_time)]);
// 创建nexa_miners表所需要的map
const miners_map = new Map();
// 判断各种情况
if (accepts.length === 0 && miners_state.length === 0) {
// 历史上没有矿工接入
return;
} else if (accepts.length !== 0 && miners_state.length === 0) {
// 主库出了问题,基本不可能出现这种情况
return;
} else if (accepts.length === 0 && miners_state.length !== 0) {
// 最近5分钟没有矿工接入直接将m2pooldb-nexa_miners表中所有矿工的accepts更新为0并放入nexa_miners表需要的map中
miners_state.forEach((item) => {
const { user, miner, state, last_submit } = item;
miners_map.set(`${user}:${miner}`, { date, user, miner, state: "offline", last_submit, accepts: 0 });
});
} else {
// 先找到所有最近5分钟有提交的矿机
accepts.forEach((item) => {
const { user, miner, accepts, last_submit } = item;
miners_map.set(`${user}:${miner}`, { date, user, miner, accepts, last_submit, state: "online" });
});
// 再将stats表有记录矿机但最近5分钟没有提交的矿机合并进去
miners_state.forEach((item) => {
const { user, miner, state, last_submit } = item;
if (!miners_map.get(`${user}:${miner}`)) {
miners_map.set(`${user}:${miner}`, { date, user, miner, accepts: 0, last_submit, state });
}
});
}
// 将指定时段内的数据插入nexa_miners表
let insert_miners_table_sql = `INSERT INTO ${this.coin}_minersv2(user, miner, date, accepts, state, last_submit) VALUES `;
const miners_table_values = [];
miners_map.forEach((item) => {
const { user, miner, date, accepts, state, last_submit } = item;
insert_miners_table_sql += `(?, ?, ?, ?, ?, ?), `;
miners_table_values.push(user, miner, date, accepts, state, last_submit);
});
insert_miners_table_sql = insert_miners_table_sql.slice(0, -2);
await this.hashratedb.exec_transaction(insert_miners_table_sql, miners_table_values);
return;
} catch (err) {
throw err;
}
}
}
const coin = "enx";
const hashrate = new HashRate(coin);
// schedule.scheduleJob({ minute: [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55], second: [30] }, async () => {
// });
async function main(){
const ymd_now = Times.utcTime(Date.now().valueOf());
const ymd = ymd_now.split(":");
const end_time = ymd[0] + ":" + ymd[1] + ":" + "00";
await hashrate.insert_hashrate_miners_table(end_time);
const currentMinute = new Date().getMinutes();
const data = await hashrate.query_hashrate_miners_accepts(end_time);
if (currentMinute === 0 || currentMinute === 30) {
await hashrate.insert_mhs(data);
await hashrate.insert_mhs_real(data);
} else {
await hashrate.insert_mhs_real(data);
}
}
main()