const Times = require("../public/times"); const Decimal = require("decimal"); const fs = require("fs"); const DBPool = require("../lib/mysql"); const Cache = require("../lib/redis"); const { NEXARPCNode, GRSRPCNode, MONARPCNode, DGBRPCNode, RXDRPCNode } = require("../lib/node"); class Init { constructor(coin, method) { this.coin = coin; const config = fs.readFileSync(`../config/${coin}.conf`, "utf-8"); const { master, slave, redis_options, node_options, distribution_conf, MAX_MATURE, REPORT_ADDRESS } = JSON.parse(config); const { pooldb, sharesdb, distribution, hashrate, users_addresses, balance } = master; const { pooldb_slave, sharesdb_slave } = slave; const { node1, node2 } = node_options; const { redis1 } = redis_options; const { POOL_FEE } = distribution_conf; const node_map = { mona: MONARPCNode, nexa: NEXARPCNode, grs: GRSRPCNode, dgbs: DGBRPCNode, dgbq: DGBRPCNode, dgbo: DGBRPCNode, rxd: RXDRPCNode, }; switch (method) { case "hashrate": this.sharesdb = new DBPool(coin, sharesdb); this.sharesdb_slave = new DBPool(coin, sharesdb_slave); this.pooldb = new DBPool(coin, pooldb); this.redis = new Cache(redis1); // this.pooldb_slave = new DBPool(coin, pooldb_slave) this.hashratedb = new DBPool(coin, hashrate); break; case "report": this.REPORT_ADDRESS = REPORT_ADDRESS; this.node = new node_map[coin](node2); this.distribution = new DBPool(coin, distribution); this.redis = new Cache(redis1); break; case "clear": this.pooldb = new DBPool(coin, pooldb); this.sharesdb = new DBPool(coin, sharesdb); this.hashratedb = new DBPool(coin, hashrate); break; case "distribution": this.pooldb = new DBPool(coin, pooldb); this.hashratedb = new DBPool(coin, hashrate); this.distributiondb = new DBPool(coin, distribution); this.users_addresses = new DBPool(coin, users_addresses); this.node = new node_map[coin](node2); this.REPORT_ADDRESS = REPORT_ADDRESS; this.POOL_FEE = POOL_FEE; console.log(`当前手续费率为:${POOL_FEE}`); // this.balance = new DBPool(coin, balance) break; case "balance": this.distribution = new DBPool(coin, distribution); this.balancedb = new DBPool(coin, balance); break; case "confirm": this.MAX_MATURE = MAX_MATURE; this.REPORT_ADDRESS = REPORT_ADDRESS; this.node = new node_map[coin](node2); this.distribution = new DBPool(coin, distribution); this.pooldb = new DBPool(coin, pooldb); break; case "stats": this.pooldb = new DBPool(coin, pooldb); this.hashratedb = new DBPool(coin, hashrate); this.distribution = new DBPool(coin, distribution); break; default: throw `暂不支持${method}方法 init`; } } sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } } class HashRate extends Init { constructor(coin) { const method = "hashrate"; super(coin, method); this.diffOneShareHashsAvg = 2 ** 32 - 1; } /** * 计算hash * @param {Number} accepts 时段内接受总数 * @param {Number} seconds 时段秒数 * @param {String} unit H/s、KH/s、MH/s、GH/s、TH/s、PH/s、EH/s * @returns */ calculate_hashrate(accepts, seconds, unit) { let num; switch (unit) { case "H/s": num = 1; break; case "KH/s": num = 1_000; break; case "MH/s": num = 1_000_000; break; case "GH/s": num = 1_000_000_000; break; case "TH/s": num = 1_000_000_000_000; break; case "PH/s": num = 1_000_000_000_000_000; break; case "EH/s": num = 10 ** 18; break; default: throw `${unit}不是已知单位`; } const hashrate = (accepts * this.diffOneShareHashsAvg) / seconds / num; return hashrate; } /** * 将主、备查询出来的数据合并 * @param {*} data [{user:"", miner:"", accepts:100},{user:"", miner:"", accepts:100}...] * @returns */ merge(data) { // 创建一个 Map 来存储 user 和 miner 组合的结果 const results = new Map(); data.forEach((item) => { const key = `${item.user}-${item.miner}`; if (results.has(key)) { const existing = results.get(key); existing.accepts += parseFloat(item.accepts); if (new Date(item.last_submit) > new Date(existing.last_submit)) { existing.last_submit = item.last_submit; } results.set(key, existing); } else { results.set(key, { user: item.user, miner: item.miner, accepts: parseFloat(item.accepts), last_submit: item.last_submit, }); } }); // 将结果转换为数组 const resultArray = Array.from(results.values()); return resultArray; } /** * 查询主库符合时段的表名 * @param {String} start_time * @param {String} end_time * @returns */ async query_table(start_time, end_time) { try { const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`; const data = await this.sharesdb.exec(sql, [end_time, start_time, end_time]) const result = []; if (data.length !== 0) { for (let item of data) { result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`); } } result.push(`${this.coin}_blk_detail`); return result; } catch (err) { console.log(err); return []; } } /** * 查询从库符合时段的表名 * @param {String} start_time * @param {String} end_time * @returns */ async query_slave_table(start_time, end_time) { try { const sql = `(SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? ORDER BY date LIMIT 1) UNION (SELECT date, \`from\`, \`to\` FROM ${this.coin}_blk_height_detail WHERE date >= ? AND date < ?) ORDER BY date;`; const data = await this.sharesdb_slave.exec(sql, [end_time, start_time, end_time]) const result = []; if (data.length !== 0) { for (let item of data) { result.push(`${this.coin}_block_detail_${item.from}_${Math.trunc(item.to - 1)}`); } } result.push(`${this.coin}_blk_detail`); return result; } catch (err) { console.log(err); return []; } } // 查询时段内accepts,主从同时查询 async query_accepts(start_time, end_time, enable) { try { if (this.count === undefined) this.count = 0; if (enable) { const [tables_name, slave_tables_name] = await Promise.all([this.query_table(start_time, end_time), this.query_slave_table(start_time, end_time)]); // 查询主库符合条件的数据 let sql = ``; if (tables_name.length <= 1) { sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`; } else { sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `; for (let i = 0; i < tables_name.length; i++) { if (i < tables_name.length - 1) { sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`; } else { sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`; } } } let slave_sql = ``; if (slave_tables_name.length <= 1) { slave_sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`; } else { slave_sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `; for (let i = 0; i < slave_tables_name.length; i++) { if (i < slave_tables_name.length - 1) { slave_sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${slave_tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`; } else { slave_sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${slave_tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`; } } } // // 执行查询,并将结果合并 const [accepts_data, slave_accepts] = await Promise.all([this.sharesdb.exec(sql), this.sharesdb_slave.exec(slave_sql)]); const accepts = this.merge(accepts_data.concat(slave_accepts)); // 合并主备accepts return accepts; } else { const tables_name = await this.query_table(start_time, end_time); let sql = ``; if (tables_name.length <= 1) { sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ${this.coin}_blk_detail WHERE date >= "${start_time}" AND date < "${end_time}" GROUP BY user, miner;`; } else { sql = `SELECT MAX(date) AS last_submit, user, miner, SUM(miner_diff) AS accepts FROM ( `; for (let i = 0; i < tables_name.length; i++) { if (i < tables_name.length - 1) { sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}" \nUNION ALL\n`; } else { sql += `SELECT date, user, miner, miner_diff, pool_diff FROM ${tables_name[i]} WHERE date >= "${start_time}" AND date < "${end_time}") AS combined_tables GROUP BY user, miner;`; } } } const accepts_data = await this.sharesdb.exec(sql); const slave_accepts = []; const accepts = this.merge(accepts_data.concat(slave_accepts)); // 合并主备accepts return accepts; } } catch (err) { console.error(`Error in query_accepts: ${err.message}`); await this.sleep(1000 * 15); if (this.count > 3) { // 重试4次,1分钟 this.count = 0; throw err; } this.count++; return this.query_accepts(start_time, end_time, enable); } } // 查询当天miners状态,排除掉超过1天没有提交的矿工 async query_miners(time) { try { const sql = `SELECT date, user, miner, state, ratio, last_submit FROM ${this.coin}_miners WHERE last_submit >= DATE(?) - INTERVAL 1 DAY;`; const miners_state = await this.pooldb.exec(sql, [time]); return miners_state; } catch (err) { throw err; } } async insert_mhs(data) { if (data.length === 0 || !data || data.size === 0) { console.log(Date.now(), ":30分钟没有新增矿机提交数据"); return; } try { let sql = `INSERT INTO ${this.coin}_mhsv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `; const values = []; data.forEach((item) => { const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item; sql += `(?, ?, ?, ?, ?, ?, ?), `; values.push(user, miner, date, mhs30m, mhs24h, state, last_submit); }); sql = sql.slice(0, -2); await this.hashratedb.exec_transaction(sql, values); } catch (err) { throw err; } } async insert_mhs_real(data) { if (data.length === 0 || !data || data.size === 0) { console.log(Date.now(), ":5分钟没有新增矿机提交数据"); return; } try { const del_sql = `DELETE FROM ${this.coin}_mhs_realv2 WHERE id > 0;`; let sql = `INSERT INTO ${this.coin}_mhs_realv2 (user, miner, date, mhs30m, mhs24h, state, last_submit) VALUES `; const values = []; data.forEach((item) => { const { user, miner, date, mhs30m, mhs24h, state, last_submit } = item; sql += `(?, ?, ?, ?, ?, ? ,?), `; values.push(user, miner, date, mhs30m, mhs24h, state, last_submit); }); sql = sql.slice(0, -2); // sql += ` AS new_values ON DUPLICATE KEY UPDATE date = new_values.date, mhs30m = new_values.mhs30m, mhs24h = new_values.mhs24h, state = new_values.state, last_submit = new_values.last_submit;`; const sqls = [{ sql: del_sql }, { sql, param: values }]; await this.hashratedb.exec_transaction_together(sqls); } catch (err) { // 处理错误 console.error("Transaction failed: ", err); throw err; } } async query_hashrate_miners_accepts(end_time) { try { const ymd_last_30m = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 30); const ymd_last_24h = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 60 * 24); const state_sql = `SELECT t1.* FROM ${this.coin}_minersv2 t1 INNER JOIN ( SELECT user, miner, MAX(date) AS max_date FROM ${this.coin}_minersv2 WHERE date <= ? GROUP BY user, miner ) t2 ON t1.user = t2.user AND t1.miner = t2.miner AND t1.date = t2.max_date;`; const mhs30m_sql = `SELECT SUM(accepts) AS accepts_30min, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`; const mhs24h_sql = `SELECT SUM(accepts) AS accepts_24h, user, miner FROM ${this.coin}_minersv2 WHERE date >= ? AND date < ? GROUP BY user, miner;`; const [state, mhs30m, mhs24h] = await Promise.all([this.hashratedb.exec(state_sql, [end_time]), this.hashratedb.exec(mhs30m_sql, [ymd_last_30m, end_time]), this.hashratedb.exec(mhs24h_sql, [ymd_last_24h, end_time])]); const hashrate_map = new Map(); state.forEach((item) => { const { date, user, miner, state, last_submit } = item; hashrate_map.set(`${user}:${miner}`, { date: end_time, user, miner, state, last_submit, mhs30m: 0, mhs24h: 0 }); }); mhs30m.forEach((item) => { const { accepts_30min, user, miner } = item; const values = hashrate_map.get(`${user}:${miner}`); values.mhs30m = this.calculate_hashrate(accepts_30min, 60 * 30, "MH/s"); hashrate_map.set(`${user}:${miner}`, values); }); mhs24h.forEach((item) => { const { accepts_24h, user, miner } = item; const values = hashrate_map.get(`${user}:${miner}`); values.mhs24h = this.calculate_hashrate(accepts_24h, 60 * 60 * 24, "MH/s"); hashrate_map.set(`${user}:${miner}`, values); }); return hashrate_map; } catch (err) { throw err; } } async insert_hashrate_miners_table(end_time) { try { const ymd = end_time.split(":"); const date = ymd[0] + ":" + ymd[1] + ":" + "00"; // 计算最近5分钟accepts,最新矿机状态 const start_time = Times.utcTime(new Date(end_time).valueOf() - 1000 * 60 * 5); let enable = (await this.redis.get(`${this.coin}:enable`)) || false; let [accepts, miners_state] = await Promise.all([this.query_accepts(start_time, end_time, enable), this.query_miners(end_time)]); // 创建nexa_miners表所需要的map const miners_map = new Map(); // 判断各种情况 if (accepts.length === 0 && miners_state.length === 0) { // 历史上没有矿工接入 return; } else if (accepts.length !== 0 && miners_state.length === 0) { // 主库出了问题,基本不可能出现这种情况 return; } else if (accepts.length === 0 && miners_state.length !== 0) { // 最近5分钟没有矿工接入,直接将m2pooldb-nexa_miners表中所有矿工的accepts更新为0,并放入nexa_miners表需要的map中 miners_state.forEach((item) => { const { user, miner, state, last_submit } = item; miners_map.set(`${user}:${miner}`, { date, user, miner, state: "offline", last_submit, accepts: 0 }); }); } else { // 先找到所有最近5分钟有提交的矿机 accepts.forEach((item) => { const { user, miner, accepts, last_submit } = item; miners_map.set(`${user}:${miner}`, { date, user, miner, accepts, last_submit, state: "online" }); }); // 再将stats表有记录矿机,但最近5分钟没有提交的矿机合并进去 miners_state.forEach((item) => { const { user, miner, state, last_submit } = item; if (!miners_map.get(`${user}:${miner}`)) { miners_map.set(`${user}:${miner}`, { date, user, miner, accepts: 0, last_submit, state }); } }); } // 将指定时段内的数据插入nexa_miners表 let insert_miners_table_sql = `INSERT INTO ${this.coin}_minersv2(user, miner, date, accepts, state, last_submit) VALUES `; const miners_table_values = []; miners_map.forEach((item) => { const { user, miner, date, accepts, state, last_submit } = item; insert_miners_table_sql += `(?, ?, ?, ?, ?, ?), `; miners_table_values.push(user, miner, date, accepts, state, last_submit); }); insert_miners_table_sql = insert_miners_table_sql.slice(0, -2); await this.hashratedb.exec_transaction(insert_miners_table_sql, miners_table_values); return; } catch (err) { throw err; } } } const coin = "enx"; const hashrate = new HashRate(coin); // schedule.scheduleJob({ minute: [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55], second: [30] }, async () => { // }); async function main(){ const ymd_now = Times.utcTime(Date.now().valueOf()); const ymd = ymd_now.split(":"); const end_time = ymd[0] + ":" + ymd[1] + ":" + "00"; await hashrate.insert_hashrate_miners_table(end_time); const currentMinute = new Date().getMinutes(); const data = await hashrate.query_hashrate_miners_accepts(end_time); if (currentMinute === 0 || currentMinute === 30) { await hashrate.insert_mhs(data); await hashrate.insert_mhs_real(data); } else { await hashrate.insert_mhs_real(data); } } main()