app v1.0
This commit is contained in:
12
test/caculate.js
Normal file
12
test/caculate.js
Normal file
@@ -0,0 +1,12 @@
|
||||
const Times = require('../public/times')
|
||||
const str = "rxd_miners_stats_20241128"
|
||||
|
||||
let sql = `DROP TABLE IF EXISTS `
|
||||
const start = new Date("2024-11-28 00:00:00").valueOf()
|
||||
for(let i=0; i<70; i++){
|
||||
const t = Times.utcTime(start + i * 86400000)
|
||||
const ymd = t.split(" ")[0].replace(/-/g, "")
|
||||
sql += ``+ `rxd_miners_stats_${ymd},`
|
||||
}
|
||||
sql = sql.slice(0, -1)
|
||||
console.log(sql);
|
||||
340
test/hashratev2-test.js
Normal file
340
test/hashratev2-test.js
Normal file
@@ -0,0 +1,340 @@
|
||||
const Times = require("../public/times");
|
||||
// const executeWithRetry = require("./public/retry")
|
||||
const Init = require("./init");
|
||||
|
||||
class HashRate extends Init {
|
||||
constructor(coin) {
|
||||
const method = "hashrate";
|
||||
super(coin, method);
|
||||
this.count = 0;
|
||||
this.diffOneShareHashsAvg = 2 ** 32 - 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算hash
|
||||
* @param {Number} accepts 时段内接受总数
|
||||
* @param {Number} seconds 时段秒数
|
||||
* @param {String} unit H/s、KH/s、MH/s、GH/s、TH/s、PH/s、EH/s
|
||||
* @returns
|
||||
*/
|
||||
calculate_hashrate(accepts, seconds, unit) {
|
||||
let num;
|
||||
switch (unit) {
|
||||
case "H/s":
|
||||
num = 1;
|
||||
break;
|
||||
case "KH/s":
|
||||
num = 1_000;
|
||||
break;
|
||||
case "MH/s":
|
||||
num = 1_000_000;
|
||||
break;
|
||||
case "GH/s":
|
||||
num = 1_000_000_000;
|
||||
break;
|
||||
case "TH/s":
|
||||
num = 1_000_000_000_000;
|
||||
break;
|
||||
case "PH/s":
|
||||
num = 1_000_000_000_000_000;
|
||||
break;
|
||||
case "EH/s":
|
||||
num = 10 ** 18;
|
||||
break;
|
||||
default:
|
||||
throw `${unit}不是已知单位`;
|
||||
}
|
||||
const hashrate = (accepts * this.diffOneShareHashsAvg) / seconds / num;
|
||||
return hashrate;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将主、备查询出来的数据合并
|
||||
* @param {*} data [{user:"", miner:"", accepts:100},{user:"", miner:"", accepts:100}...]
|
||||
* @returns
|
||||
*/
|
||||
merge(data) {
|
||||
// 创建一个 Map 来存储 user 和 miner 组合的结果
|
||||
const results = new Map();
|
||||
|
||||
data.forEach((item) => {
|
||||
const key = `${item.user}-${item.miner}`;
|
||||
|
||||
if (results.has(key)) {
|
||||
const existing = results.get(key);
|
||||
existing.accepts += parseFloat(item.accepts);
|
||||
if (new Date(item.last_submit) > new Date(existing.last_submit)) {
|
||||
existing.last_submit = item.last_submit;
|
||||
}
|
||||
results.set(key, existing);
|
||||
} else {
|
||||
results.set(key, {
|
||||
user: item.user,
|
||||
miner: item.miner,
|
||||
accepts: parseFloat(item.accepts),
|
||||
last_submit: item.last_submit,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// 将结果转换为数组
|
||||
const resultArray = Array.from(results.values());
|
||||
return resultArray;
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询主库符合时段的表名
|
||||
* @param {String} start_time
|
||||
* @param {String} end_time
|
||||
* @returns
|
||||
*/
|
||||
async query_table(start_time, end_time) {
|
||||
try {
|
||||
const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`;
|
||||
const data = await this.sharesdb.exec(sql, [end_time, start_time, end_time]);
|
||||
const result = [];
|
||||
if (data.length !== 0) {
|
||||
for (let item of data) {
|
||||
result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`);
|
||||
}
|
||||
}
|
||||
result.push(`${this.coin}_blk_detail`);
|
||||
return result;
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询从库符合时段的表名
|
||||
* @param {String} start_time
|
||||
* @param {String} end_time
|
||||
* @returns
|
||||
*/
|
||||
async query_slave_table(start_time, end_time) {
|
||||
try {
|
||||
const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`;
|
||||
const data = await this.sharesdb_slave.exec(sql, [end_time, start_time, end_time]);
|
||||
const result = [];
|
||||
if (data.length !== 0) {
|
||||
for (let item of data) {
|
||||
result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`);
|
||||
}
|
||||
}
|
||||
result.push(`${this.coin}_blk_detail`);
|
||||
return result;
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
// 查询时段内accepts,主从同时查询
|
||||
async query_accepts(start_time, end_time, enable) {
|
||||
try {
|
||||
if (this.count === undefined) this.count = 0;
|
||||
|
||||
const generateUnionSql = (tables) => {
|
||||
let sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `;
|
||||
for (let i = 0; i < tables.length; i++) {
|
||||
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables[i]} WHERE date >= ? AND date < ?`;
|
||||
if (i < tables.length - 1) {
|
||||
sql += ` \nUNION ALL\n `;
|
||||
}
|
||||
}
|
||||
sql += `) AS combined_tables GROUP BY user, miner;`;
|
||||
return sql;
|
||||
};
|
||||
|
||||
let sql, slave_sql;
|
||||
if (enable) {
|
||||
const [tables_name, slave_tables_name] = await Promise.all([this.query_table(start_time, end_time), this.query_slave_table(start_time, end_time)]);
|
||||
|
||||
sql = tables_name.length <= 1 ? `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= ? AND date < ? GROUP BY user, miner;` : generateUnionSql(tables_name);
|
||||
|
||||
slave_sql = slave_tables_name.length <= 1 ? `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= ? AND date < ? GROUP BY user, miner;` : generateUnionSql(slave_tables_name);
|
||||
|
||||
const [accepts_data, slave_accepts] = await Promise.all([this.sharesdb.exec(sql, [start_time, end_time]), this.sharesdb_slave.exec(slave_sql, [start_time, end_time])]);
|
||||
|
||||
return this.merge(accepts_data.concat(slave_accepts));
|
||||
} else {
|
||||
const tables_name = await this.query_table(start_time, end_time);
|
||||
|
||||
sql = tables_name.length <= 1 ? `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= ? AND date < ? GROUP BY user, miner;` : generateUnionSql(tables_name);
|
||||
|
||||
const accepts_data = await this.sharesdb.exec(sql, [start_time, end_time]);
|
||||
return this.merge(accepts_data);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Error in query_accepts: ${err.message}`);
|
||||
await this.sleep(1000 * 15);
|
||||
if (this.count > 3) { // 重试4次,1分钟
|
||||
this.count = 0;
|
||||
throw err;
|
||||
}
|
||||
this.count++;
|
||||
return this.query_accepts(start_time, end_time, enable);
|
||||
}
|
||||
}
|
||||
|
||||
// 查询当天miners状态,排除掉超过1天没有提交的矿工
|
||||
async query_miners(time) {
|
||||
try {
|
||||
const sql = `SELECT date, user, miner, state, ratio, last_submit FROM ${this.coin}_miners WHERE last_submit >= DATE(?) - INTERVAL 1 DAY;`;
|
||||
const miners_state = await this.pooldb.exec(sql, [time]);
|
||||
return miners_state;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async insert_mhs(data) {
|
||||
if (data.length === 0 || !data || data.size === 0) {
|
||||
console.log(Date.now(), ":30分钟没有新增矿机提交数据");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
let sql = `INSERT INTO ${this.coin}_mhsv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `;
|
||||
const values = [];
|
||||
data.forEach((item) => {
|
||||
const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item;
|
||||
sql += `(?, ?, ?, ?, ?, ?, ?), `;
|
||||
values.push(user, miner, date, mhs30m, mhs24h, state, last_submit);
|
||||
});
|
||||
sql = sql.slice(0, -2);
|
||||
await this.hashratedb.exec_transaction(sql, values);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async insert_mhs_real(data) {
|
||||
if (data.length === 0 || !data || data.size === 0) {
|
||||
console.log(Date.now(), ":5分钟没有新增矿机提交数据");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const del_sql = `DELETE FROM ${this.coin}_mhs_realv2 WHERE id > 0;`;
|
||||
let sql = `INSERT INTO ${this.coin}_mhs_realv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `;
|
||||
const values = [];
|
||||
data.forEach((item) => {
|
||||
const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item;
|
||||
sql += `(?, ?, ?, ?, ?, ? ,?), `;
|
||||
values.push(user, miner, date, mhs30m, mhs24h, state, last_submit);
|
||||
});
|
||||
sql = sql.slice(0, -2);
|
||||
// sql += ` AS new_values ON DUPLICATE KEY UPDATE date = new_values.date, mhs30m = new_values.mhs30m, mhs24h = new_values.mhs24h, state = new_values.state, last_submit = new_values.last_submit;`;
|
||||
const sqls = [{ sql: del_sql }, { sql, param: values }];
|
||||
await this.hashratedb.exec_transaction_together(sqls);
|
||||
} catch (err) {
|
||||
// 处理错误
|
||||
console.error("Transaction failed: ", err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async query_hashrate_miners_accepts(end_time) {
|
||||
try {
|
||||
const ymd_last_30m = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 30);
|
||||
const ymd_last_24h = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 60 * 24);
|
||||
const state_sql = `SELECT t1.*
|
||||
FROM ${this.coin}_minersv2 t1
|
||||
INNER JOIN (
|
||||
SELECT user, miner, MAX(date) AS max_date
|
||||
FROM ${this.coin}_minersv2
|
||||
WHERE date <= ?
|
||||
GROUP BY user, miner
|
||||
) t2
|
||||
ON t1.user = t2.user AND t1.miner = t2.miner AND t1.date = t2.max_date;`;
|
||||
const mhs30m_sql = `SELECT SUM(accepts) AS accepts_30min, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`;
|
||||
const mhs24h_sql = `SELECT SUM(accepts) AS accepts_24h, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`;
|
||||
const [state, mhs30m, mhs24h] = await Promise.all([this.hashratedb.exec(state_sql, [end_time]), this.hashratedb.exec(mhs30m_sql, [ymd_last_30m, end_time]), this.hashratedb.exec(mhs24h_sql, [ymd_last_24h, end_time])]);
|
||||
|
||||
const hashrate_map = new Map();
|
||||
|
||||
state.forEach((item) => {
|
||||
const { date, user, miner, state, last_submit } = item;
|
||||
hashrate_map.set(`${user}:${miner}`, { date: end_time, user, miner, state, last_submit, mhs30m: 0, mhs24h: 0 });
|
||||
});
|
||||
|
||||
mhs30m.forEach((item) => {
|
||||
const { accepts_30min, user, miner } = item;
|
||||
|
||||
const values = hashrate_map.get(`${user}:${miner}`);
|
||||
|
||||
values.mhs30m = this.calculate_hashrate(accepts_30min, 60 * 30, "MH/s");
|
||||
|
||||
hashrate_map.set(`${user}:${miner}`, values);
|
||||
});
|
||||
mhs24h.forEach((item) => {
|
||||
const { accepts_24h, user, miner } = item;
|
||||
const values = hashrate_map.get(`${user}:${miner}`);
|
||||
|
||||
values.mhs24h = this.calculate_hashrate(accepts_24h, 60 * 60 * 24, "MH/s");
|
||||
|
||||
hashrate_map.set(`${user}:${miner}`, values);
|
||||
});
|
||||
return hashrate_map;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async insert_hashrate_miners_table(end_time) {
|
||||
try {
|
||||
const ymd = end_time.split(":");
|
||||
const date = ymd[0] + ":" + ymd[1] + ":" + "00";
|
||||
// 计算最近5分钟accepts,最新矿机状态
|
||||
const start_time = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 5);
|
||||
let enable = (await this.redis.get(`${this.coin}:enable`)) || false;
|
||||
|
||||
let [accepts, miners_state] = await Promise.all([this.query_accepts(start_time, end_time, enable), this.query_miners(end_time)]);
|
||||
|
||||
// 创建nexa_miners表所需要的map
|
||||
const miners_map = new Map();
|
||||
// 判断各种情况
|
||||
if (accepts.length === 0 && miners_state.length === 0) {
|
||||
// 历史上没有矿工接入
|
||||
return;
|
||||
} else if (accepts.length !== 0 && miners_state.length === 0) {
|
||||
// 主库出了问题,基本不可能出现这种情况
|
||||
return;
|
||||
} else if (accepts.length === 0 && miners_state.length !== 0) {
|
||||
// 最近5分钟没有矿工接入,直接将m2pooldb-nexa_miners表中所有矿工的accepts更新为0,并放入nexa_miners表需要的map中
|
||||
miners_state.forEach((item) => {
|
||||
const { user, miner, state, last_submit } = item;
|
||||
miners_map.set(`${user}:${miner}`, { date, user, miner, state: "offline", last_submit, accepts: 0 });
|
||||
});
|
||||
} else {
|
||||
// 先找到所有最近5分钟有提交的矿机
|
||||
accepts.forEach((item) => {
|
||||
const { user, miner, accepts, last_submit } = item;
|
||||
miners_map.set(`${user}:${miner}`, { date, user, miner, accepts, last_submit, state: "online" });
|
||||
});
|
||||
// 再将stats表有记录矿机,但最近5分钟没有提交的矿机合并进去
|
||||
miners_state.forEach((item) => {
|
||||
const { user, miner, state, last_submit } = item;
|
||||
if (!miners_map.get(`${user}:${miner}`)) {
|
||||
miners_map.set(`${user}:${miner}`, { date, user, miner, accepts: 0, last_submit, state });
|
||||
}
|
||||
});
|
||||
}
|
||||
// 将指定时段内的数据插入nexa_miners表
|
||||
let insert_miners_table_sql = `INSERT INTO ${this.coin}_minersv2(user, miner, date, accepts, state, last_submit) VALUES `;
|
||||
const miners_table_values = [];
|
||||
miners_map.forEach((item) => {
|
||||
const { user, miner, date, accepts, state, last_submit } = item;
|
||||
insert_miners_table_sql += `(?, ?, ?, ?, ?, ?), `;
|
||||
miners_table_values.push(user, miner, date, accepts, state, last_submit);
|
||||
});
|
||||
insert_miners_table_sql = insert_miners_table_sql.slice(0, -2);
|
||||
await this.hashratedb.exec_transaction(insert_miners_table_sql, miners_table_values);
|
||||
return;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = HashRate;
|
||||
463
test/test1.js
Normal file
463
test/test1.js
Normal file
@@ -0,0 +1,463 @@
|
||||
const Times = require("../public/times");
|
||||
const Decimal = require("decimal");
|
||||
const fs = require("fs");
|
||||
const DBPool = require("../lib/mysql");
|
||||
const Cache = require("../lib/redis");
|
||||
const { NEXARPCNode, GRSRPCNode, MONARPCNode, DGBRPCNode, RXDRPCNode } = require("../lib/node");
|
||||
|
||||
class Init {
|
||||
constructor(coin, method) {
|
||||
this.coin = coin;
|
||||
const config = fs.readFileSync(`../config/${coin}.conf`, "utf-8");
|
||||
const { master, slave, redis_options, node_options, distribution_conf, MAX_MATURE, REPORT_ADDRESS } = JSON.parse(config);
|
||||
const { pooldb, sharesdb, distribution, hashrate, users_addresses, balance } = master;
|
||||
const { pooldb_slave, sharesdb_slave } = slave;
|
||||
const { node1, node2 } = node_options;
|
||||
const { redis1 } = redis_options;
|
||||
const { POOL_FEE } = distribution_conf;
|
||||
const node_map = {
|
||||
mona: MONARPCNode,
|
||||
nexa: NEXARPCNode,
|
||||
grs: GRSRPCNode,
|
||||
dgbs: DGBRPCNode,
|
||||
dgbq: DGBRPCNode,
|
||||
dgbo: DGBRPCNode,
|
||||
rxd: RXDRPCNode,
|
||||
};
|
||||
|
||||
switch (method) {
|
||||
case "hashrate":
|
||||
this.sharesdb = new DBPool(coin, sharesdb);
|
||||
this.sharesdb_slave = new DBPool(coin, sharesdb_slave);
|
||||
this.pooldb = new DBPool(coin, pooldb);
|
||||
this.redis = new Cache(redis1);
|
||||
// this.pooldb_slave = new DBPool(coin, pooldb_slave)
|
||||
this.hashratedb = new DBPool(coin, hashrate);
|
||||
break;
|
||||
case "report":
|
||||
this.REPORT_ADDRESS = REPORT_ADDRESS;
|
||||
this.node = new node_map[coin](node2);
|
||||
this.distribution = new DBPool(coin, distribution);
|
||||
this.redis = new Cache(redis1);
|
||||
break;
|
||||
case "clear":
|
||||
this.pooldb = new DBPool(coin, pooldb);
|
||||
this.sharesdb = new DBPool(coin, sharesdb);
|
||||
this.hashratedb = new DBPool(coin, hashrate);
|
||||
break;
|
||||
case "distribution":
|
||||
this.pooldb = new DBPool(coin, pooldb);
|
||||
this.hashratedb = new DBPool(coin, hashrate);
|
||||
this.distributiondb = new DBPool(coin, distribution);
|
||||
this.users_addresses = new DBPool(coin, users_addresses);
|
||||
this.node = new node_map[coin](node2);
|
||||
this.REPORT_ADDRESS = REPORT_ADDRESS;
|
||||
this.POOL_FEE = POOL_FEE;
|
||||
console.log(`当前手续费率为:${POOL_FEE}`);
|
||||
// this.balance = new DBPool(coin, balance)
|
||||
break;
|
||||
case "balance":
|
||||
this.distribution = new DBPool(coin, distribution);
|
||||
this.balancedb = new DBPool(coin, balance);
|
||||
break;
|
||||
case "confirm":
|
||||
this.MAX_MATURE = MAX_MATURE;
|
||||
this.REPORT_ADDRESS = REPORT_ADDRESS;
|
||||
this.node = new node_map[coin](node2);
|
||||
this.distribution = new DBPool(coin, distribution);
|
||||
this.pooldb = new DBPool(coin, pooldb);
|
||||
break;
|
||||
case "stats":
|
||||
this.pooldb = new DBPool(coin, pooldb);
|
||||
this.hashratedb = new DBPool(coin, hashrate);
|
||||
this.distribution = new DBPool(coin, distribution);
|
||||
break;
|
||||
default:
|
||||
throw `暂不支持${method}方法 init`;
|
||||
}
|
||||
}
|
||||
|
||||
sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
}
|
||||
|
||||
class HashRate extends Init {
|
||||
constructor(coin) {
|
||||
const method = "hashrate";
|
||||
super(coin, method);
|
||||
this.diffOneShareHashsAvg = 2 ** 32 - 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算hash
|
||||
* @param {Number} accepts 时段内接受总数
|
||||
* @param {Number} seconds 时段秒数
|
||||
* @param {String} unit H/s、KH/s、MH/s、GH/s、TH/s、PH/s、EH/s
|
||||
* @returns
|
||||
*/
|
||||
calculate_hashrate(accepts, seconds, unit) {
|
||||
let num;
|
||||
switch (unit) {
|
||||
case "H/s":
|
||||
num = 1;
|
||||
break;
|
||||
case "KH/s":
|
||||
num = 1_000;
|
||||
break;
|
||||
case "MH/s":
|
||||
num = 1_000_000;
|
||||
break;
|
||||
case "GH/s":
|
||||
num = 1_000_000_000;
|
||||
break;
|
||||
case "TH/s":
|
||||
num = 1_000_000_000_000;
|
||||
break;
|
||||
case "PH/s":
|
||||
num = 1_000_000_000_000_000;
|
||||
break;
|
||||
case "EH/s":
|
||||
num = 10 ** 18;
|
||||
break;
|
||||
default:
|
||||
throw `${unit}不是已知单位`;
|
||||
}
|
||||
const hashrate = (accepts * this.diffOneShareHashsAvg) / seconds / num;
|
||||
return hashrate;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将主、备查询出来的数据合并
|
||||
* @param {*} data [{user:"", miner:"", accepts:100},{user:"", miner:"", accepts:100}...]
|
||||
* @returns
|
||||
*/
|
||||
merge(data) {
|
||||
// 创建一个 Map 来存储 user 和 miner 组合的结果
|
||||
const results = new Map();
|
||||
|
||||
data.forEach((item) => {
|
||||
const key = `${item.user}-${item.miner}`;
|
||||
|
||||
if (results.has(key)) {
|
||||
const existing = results.get(key);
|
||||
existing.accepts += parseFloat(item.accepts);
|
||||
if (new Date(item.last_submit) > new Date(existing.last_submit)) {
|
||||
existing.last_submit = item.last_submit;
|
||||
}
|
||||
results.set(key, existing);
|
||||
} else {
|
||||
results.set(key, {
|
||||
user: item.user,
|
||||
miner: item.miner,
|
||||
accepts: parseFloat(item.accepts),
|
||||
last_submit: item.last_submit,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// 将结果转换为数组
|
||||
const resultArray = Array.from(results.values());
|
||||
return resultArray;
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询主库符合时段的表名
|
||||
* @param {String} start_time
|
||||
* @param {String} end_time
|
||||
* @returns
|
||||
*/
|
||||
async query_table(start_time, end_time) {
|
||||
try {
|
||||
const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`;
|
||||
const data = await this.sharesdb.exec(sql, [end_time, start_time, end_time])
|
||||
const result = [];
|
||||
if (data.length !== 0) {
|
||||
for (let item of data) {
|
||||
result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`);
|
||||
}
|
||||
}
|
||||
result.push(`${this.coin}_blk_detail`);
|
||||
return result;
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询从库符合时段的表名
|
||||
* @param {String} start_time
|
||||
* @param {String} end_time
|
||||
* @returns
|
||||
*/
|
||||
async query_slave_table(start_time, end_time) {
|
||||
try {
|
||||
const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`;
|
||||
const data = await this.sharesdb_slave.exec(sql, [end_time, start_time, end_time])
|
||||
const result = [];
|
||||
if (data.length !== 0) {
|
||||
for (let item of data) {
|
||||
result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`);
|
||||
}
|
||||
}
|
||||
result.push(`${this.coin}_blk_detail`);
|
||||
return result;
|
||||
} catch (err) {
|
||||
console.log(err);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
// 查询时段内accepts,主从同时查询
|
||||
async query_accepts(start_time, end_time, enable) {
|
||||
try {
|
||||
if (this.count === undefined) this.count = 0;
|
||||
if (enable) {
|
||||
const [tables_name, slave_tables_name] = await Promise.all([this.query_table(start_time, end_time), this.query_slave_table(start_time, end_time)]);
|
||||
|
||||
// 查询主库符合条件的数据
|
||||
let sql = ``;
|
||||
if (tables_name.length <= 1) {
|
||||
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`;
|
||||
} else {
|
||||
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `;
|
||||
for (let i = 0; i < tables_name.length; i++) {
|
||||
if (i < tables_name.length - 1) {
|
||||
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`;
|
||||
} else {
|
||||
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`;
|
||||
}
|
||||
}
|
||||
}
|
||||
let slave_sql = ``;
|
||||
if (slave_tables_name.length <= 1) {
|
||||
slave_sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`;
|
||||
} else {
|
||||
slave_sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `;
|
||||
for (let i = 0; i < slave_tables_name.length; i++) {
|
||||
if (i < slave_tables_name.length - 1) {
|
||||
slave_sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${slave_tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`;
|
||||
} else {
|
||||
slave_sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${slave_tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// // 执行查询,并将结果合并
|
||||
const [accepts_data, slave_accepts] = await Promise.all([this.sharesdb.exec(sql), this.sharesdb_slave.exec(slave_sql)]);
|
||||
const accepts = this.merge(accepts_data.concat(slave_accepts)); // 合并主备accepts
|
||||
return accepts;
|
||||
} else {
|
||||
const tables_name = await this.query_table(start_time, end_time);
|
||||
let sql = ``;
|
||||
if (tables_name.length <= 1) {
|
||||
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`;
|
||||
} else {
|
||||
sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `;
|
||||
for (let i = 0; i < tables_name.length; i++) {
|
||||
if (i < tables_name.length - 1) {
|
||||
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`;
|
||||
} else {
|
||||
sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`;
|
||||
}
|
||||
}
|
||||
}
|
||||
const accepts_data = await this.sharesdb.exec(sql);
|
||||
const slave_accepts = [];
|
||||
const accepts = this.merge(accepts_data.concat(slave_accepts)); // 合并主备accepts
|
||||
return accepts;
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Error in query_accepts: ${err.message}`);
|
||||
await this.sleep(1000 * 15);
|
||||
if (this.count > 3) { // 重试4次,1分钟
|
||||
this.count = 0;
|
||||
throw err;
|
||||
}
|
||||
this.count++;
|
||||
return this.query_accepts(start_time, end_time, enable);
|
||||
}
|
||||
}
|
||||
|
||||
// 查询当天miners状态,排除掉超过1天没有提交的矿工
|
||||
async query_miners(time) {
|
||||
try {
|
||||
const sql = `SELECT date, user, miner, state, ratio, last_submit FROM ${this.coin}_miners WHERE last_submit >= DATE(?) - INTERVAL 1 DAY;`;
|
||||
const miners_state = await this.pooldb.exec(sql, [time]);
|
||||
return miners_state;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async insert_mhs(data) {
|
||||
if (data.length === 0 || !data || data.size === 0) {
|
||||
console.log(Date.now(), ":30分钟没有新增矿机提交数据");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
let sql = `INSERT INTO ${this.coin}_mhsv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `;
|
||||
const values = [];
|
||||
data.forEach((item) => {
|
||||
const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item;
|
||||
sql += `(?, ?, ?, ?, ?, ?, ?), `;
|
||||
values.push(user, miner, date, mhs30m, mhs24h, state, last_submit);
|
||||
});
|
||||
sql = sql.slice(0, -2);
|
||||
await this.hashratedb.exec_transaction(sql, values);
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async insert_mhs_real(data) {
|
||||
if (data.length === 0 || !data || data.size === 0) {
|
||||
console.log(Date.now(), ":5分钟没有新增矿机提交数据");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const del_sql = `DELETE FROM ${this.coin}_mhs_realv2 WHERE id > 0;`;
|
||||
let sql = `INSERT INTO ${this.coin}_mhs_realv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `;
|
||||
const values = [];
|
||||
data.forEach((item) => {
|
||||
const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item;
|
||||
sql += `(?, ?, ?, ?, ?, ? ,?), `;
|
||||
values.push(user, miner, date, mhs30m, mhs24h, state, last_submit);
|
||||
});
|
||||
sql = sql.slice(0, -2);
|
||||
// sql += ` AS new_values ON DUPLICATE KEY UPDATE date = new_values.date, mhs30m = new_values.mhs30m, mhs24h = new_values.mhs24h, state = new_values.state, last_submit = new_values.last_submit;`;
|
||||
const sqls = [{ sql: del_sql }, { sql, param: values }];
|
||||
await this.hashratedb.exec_transaction_together(sqls);
|
||||
} catch (err) {
|
||||
// 处理错误
|
||||
console.error("Transaction failed: ", err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async query_hashrate_miners_accepts(end_time) {
|
||||
try {
|
||||
const ymd_last_30m = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 30);
|
||||
const ymd_last_24h = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 60 * 24);
|
||||
const state_sql = `SELECT t1.*
|
||||
FROM ${this.coin}_minersv2 t1
|
||||
INNER JOIN (
|
||||
SELECT user, miner, MAX(date) AS max_date
|
||||
FROM ${this.coin}_minersv2
|
||||
WHERE date <= ?
|
||||
GROUP BY user, miner
|
||||
) t2
|
||||
ON t1.user = t2.user AND t1.miner = t2.miner AND t1.date = t2.max_date;`;
|
||||
const mhs30m_sql = `SELECT SUM(accepts) AS accepts_30min, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`;
|
||||
const mhs24h_sql = `SELECT SUM(accepts) AS accepts_24h, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`;
|
||||
const [state, mhs30m, mhs24h] = await Promise.all([this.hashratedb.exec(state_sql, [end_time]), this.hashratedb.exec(mhs30m_sql, [ymd_last_30m, end_time]), this.hashratedb.exec(mhs24h_sql, [ymd_last_24h, end_time])]);
|
||||
|
||||
const hashrate_map = new Map();
|
||||
|
||||
state.forEach((item) => {
|
||||
const { date, user, miner, state, last_submit } = item;
|
||||
hashrate_map.set(`${user}:${miner}`, { date: end_time, user, miner, state, last_submit, mhs30m: 0, mhs24h: 0 });
|
||||
});
|
||||
|
||||
mhs30m.forEach((item) => {
|
||||
const { accepts_30min, user, miner } = item;
|
||||
|
||||
const values = hashrate_map.get(`${user}:${miner}`);
|
||||
|
||||
values.mhs30m = this.calculate_hashrate(accepts_30min, 60 * 30, "MH/s");
|
||||
|
||||
hashrate_map.set(`${user}:${miner}`, values);
|
||||
});
|
||||
mhs24h.forEach((item) => {
|
||||
const { accepts_24h, user, miner } = item;
|
||||
const values = hashrate_map.get(`${user}:${miner}`);
|
||||
|
||||
values.mhs24h = this.calculate_hashrate(accepts_24h, 60 * 60 * 24, "MH/s");
|
||||
|
||||
hashrate_map.set(`${user}:${miner}`, values);
|
||||
});
|
||||
return hashrate_map;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async insert_hashrate_miners_table(end_time) {
|
||||
try {
|
||||
const ymd = end_time.split(":");
|
||||
const date = ymd[0] + ":" + ymd[1] + ":" + "00";
|
||||
// 计算最近5分钟accepts,最新矿机状态
|
||||
const start_time = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 5);
|
||||
let enable = (await this.redis.get(`${this.coin}:enable`)) || false;
|
||||
|
||||
let [accepts, miners_state] = await Promise.all([this.query_accepts(start_time, end_time, enable), this.query_miners(end_time)]);
|
||||
|
||||
// 创建nexa_miners表所需要的map
|
||||
const miners_map = new Map();
|
||||
// 判断各种情况
|
||||
if (accepts.length === 0 && miners_state.length === 0) {
|
||||
// 历史上没有矿工接入
|
||||
return;
|
||||
} else if (accepts.length !== 0 && miners_state.length === 0) {
|
||||
// 主库出了问题,基本不可能出现这种情况
|
||||
return;
|
||||
} else if (accepts.length === 0 && miners_state.length !== 0) {
|
||||
// 最近5分钟没有矿工接入,直接将m2pooldb-nexa_miners表中所有矿工的accepts更新为0,并放入nexa_miners表需要的map中
|
||||
miners_state.forEach((item) => {
|
||||
const { user, miner, state, last_submit } = item;
|
||||
miners_map.set(`${user}:${miner}`, { date, user, miner, state: "offline", last_submit, accepts: 0 });
|
||||
});
|
||||
} else {
|
||||
// 先找到所有最近5分钟有提交的矿机
|
||||
accepts.forEach((item) => {
|
||||
const { user, miner, accepts, last_submit } = item;
|
||||
miners_map.set(`${user}:${miner}`, { date, user, miner, accepts, last_submit, state: "online" });
|
||||
});
|
||||
// 再将stats表有记录矿机,但最近5分钟没有提交的矿机合并进去
|
||||
miners_state.forEach((item) => {
|
||||
const { user, miner, state, last_submit } = item;
|
||||
if (!miners_map.get(`${user}:${miner}`)) {
|
||||
miners_map.set(`${user}:${miner}`, { date, user, miner, accepts: 0, last_submit, state });
|
||||
}
|
||||
});
|
||||
}
|
||||
// 将指定时段内的数据插入nexa_miners表
|
||||
let insert_miners_table_sql = `INSERT INTO ${this.coin}_minersv2(user, miner, date, accepts, state, last_submit) VALUES `;
|
||||
const miners_table_values = [];
|
||||
miners_map.forEach((item) => {
|
||||
const { user, miner, date, accepts, state, last_submit } = item;
|
||||
insert_miners_table_sql += `(?, ?, ?, ?, ?, ?), `;
|
||||
miners_table_values.push(user, miner, date, accepts, state, last_submit);
|
||||
});
|
||||
insert_miners_table_sql = insert_miners_table_sql.slice(0, -2);
|
||||
await this.hashratedb.exec_transaction(insert_miners_table_sql, miners_table_values);
|
||||
return;
|
||||
} catch (err) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
const coin = "enx";
|
||||
const hashrate = new HashRate(coin);
|
||||
// schedule.scheduleJob({ minute: [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55], second: [30] }, async () => {
|
||||
|
||||
// });
|
||||
async function main(){
|
||||
const ymd_now = Times.utcTime(Date.now().valueOf());
|
||||
const ymd = ymd_now.split(":");
|
||||
const end_time = ymd[0] + ":" + ymd[1] + ":" + "00";
|
||||
await hashrate.insert_hashrate_miners_table(end_time);
|
||||
const currentMinute = new Date().getMinutes();
|
||||
|
||||
const data = await hashrate.query_hashrate_miners_accepts(end_time);
|
||||
if (currentMinute === 0 || currentMinute === 30) {
|
||||
await hashrate.insert_mhs(data);
|
||||
await hashrate.insert_mhs_real(data);
|
||||
} else {
|
||||
await hashrate.insert_mhs_real(data);
|
||||
}
|
||||
}
|
||||
main()
|
||||
Reference in New Issue
Block a user