m2pool-core/internal/gbt/monero/monore.go

496 lines
14 KiB
Go
Raw Normal View History

2025-09-03 08:00:42 +00:00
package monero
import (
"crypto/rand"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math"
"pool/internal/db"
"pool/internal/gbt/coin"
"pool/internal/gbt/dbif"
"pool/internal/msg"
"pool/internal/utility"
"sync/atomic"
"time"
"go.uber.org/zap"
)
const GBT_MONERO_VERSION string = "monero v1.0"
type MoneroAddrConfig struct {
Addr string `json:"addr"`
}
type MoneroConfig struct {
Monero MoneroAddrConfig `json:"nexa"`
}
type GetBlockTemplateResponse struct {
ID string `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Result msg.MoneroStratumJob `json:"result"`
Error *RpcError `json:"error,omitempty"`
}
type RpcError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// RPCError 用于描述 RPC 返回的错误
type RPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// 通用 RPC 响应结构
type MoneroRPCResponse[T any] struct {
ID string `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Result *T `json:"result,omitempty"` // 成功时使用
Error *RPCError `json:"error,omitempty"` // 失败时使用
}
type SubmitSuccess struct {
BlockId string `json:"block_id"`
Status string `json:"status"`
Untrusted bool `json:"untrusted"`
}
type GbtMoneroContext struct {
Config MoneroConfig
GbtCtx *coin.GbtContext
last_time time.Time
last_gbt msg.MoneroStratumJob
last_blockhash string
last_height uint32
Submits float64
addressIndex int
Target []byte
Header []byte
last_body string
new_block_chan chan int
new_block_index int
}
var logg *zap.Logger
var GbtMoneroCtx GbtMoneroContext
func configInit(config *MoneroConfig) {
data, err := ioutil.ReadFile("gbt.conf")
if err != nil {
panic(err.Error())
}
if err = json.Unmarshal(data, &config); err != nil {
panic(err.Error())
}
}
func Init(GbtCtx *coin.GbtContext, DbCtx *db.DbContext) {
GbtMoneroCtx.GbtCtx = GbtCtx
GbtMoneroCtx.last_height = 0
configInit(&GbtMoneroCtx.Config)
GbtMoneroCtx.Target = make([]byte, 32)
GbtMoneroCtx.Header = make([]byte, 49)
GbtMoneroCtx.last_time = time.Now()
logg = GbtCtx.Log
GbtMoneroCtx.new_block_chan = make(chan int, 256)
GbtMoneroCtx.new_block_index = 0
logg.Info("[gbt]", zap.String("gbt_monero_version", GBT_MONERO_VERSION))
}
func Start() {
go gbt_running(&GbtMoneroCtx)
go gbt_notify_running(&GbtMoneroCtx)
go submit_block_running(&GbtMoneroCtx)
}
func Stop() {
defer close(GbtMoneroCtx.new_block_chan)
}
type BlockCheckData struct {
Height int
Nonce string
User string
Miner string
MinerId string
Hash string
SubIdx int
}
type GetBlockHeaderResp struct {
Id string `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Error any `json:"error"`
Result GetBlockHeaderMsg `json:"result"`
}
type GetBlockHeaderMsg struct {
Result struct {
BlockHeader struct {
Nonce uint64 `json:"nonce"`
PowHash string `json:"pow_hash"`
Reward uint64 `json:"reward"`
} `json:"block_header"`
Status string `json:"status"`
Untrusted bool `json:"untrusted"`
} `json:"result"`
}
func update_block_confirm(gbt *GbtMoneroContext) {
db, err := sql.Open("sqlite3", "./blocks.db")
if err != nil {
//log.Printf("Error opening database: %v", err)
logg.Error("[gbt]", zap.String("Error opening database", err.Error()))
return
}
defer db.Close()
query := "SELECT user,miner,minerid,height,nonce,hash,subidx FROM blocks WHERE checked=0 AND created_at >= datetime('now', '-30 minutes') order by id desc limit 2"
rows, err := db.Query(query)
if err != nil {
//log.Printf("Error executing query from blocks: %v", err)
logg.Error("[gbt]", zap.String("Error executing query from blocks:", err.Error()))
return
}
defer rows.Close()
var blocks []BlockCheckData
for rows.Next() {
var height int
var nonce string
var user string
var miner string
var minerid string
var hash string
var subidx int
if err := rows.Scan(&user, &miner, &minerid, &height, &nonce, &hash, &subidx); err != nil {
//log.Printf("Error scanning row in blocks: %v", err)
logg.Error("[gbt]", zap.String("Error scanning row in blocks:", err.Error()))
return
}
var blockdata BlockCheckData
blockdata.Height = height
blockdata.Nonce = nonce
blockdata.User = user
blockdata.Miner = miner
blockdata.MinerId = minerid
blockdata.Hash = hash
blockdata.SubIdx = subidx
blocks = append(blocks, blockdata)
//fmt.Printf("blocks - Height: %d, Nonce: %d\n", height, nonce)
//log.Printf("update block height %d nonce %s, subidx %d, user %s", height, nonce, subidx, user+"."+miner+"_"+minerid)
}
for _, block := range blocks {
var blockHeaderResp GetBlockHeaderResp
resp, err := gbt.GbtCtx.MoneroClinet.GetBlockByHash(block.Hash)
err = json.Unmarshal(resp, &blockHeaderResp)
if err != nil {
logg.Error("[gbt]", zap.String("getblockheader Unmarshal ", fmt.Sprint(block.Height)+" "+err.Error()))
continue
}
if blockHeaderResp.Error != nil {
fmt.Println("[check block]:", blockHeaderResp.Error)
update_sql := `UPDATE blocks SET checked = 2 WHERE height = ? AND nonce = ? AND checked = 0`
_, err = db.Exec(update_sql, block.Height, block.Nonce)
if err != nil {
//log.Printf("Error updating blk_new: %v", err)
logg.Error("[gbt]", zap.String("Error updating blk_new:", err.Error()))
}
return
}
blockHeader := blockHeaderResp.Result
block_height := int64(block.Height)
// nonceHex := fmt.Sprintf("%x", block.Nonce)
dbif.NotifyPoolBlkStatsSuccess(gbt.GbtCtx, block_height, "", block.Nonce, int64(block.SubIdx), float64(blockHeader.Result.BlockHeader.Reward)/math.Pow(10, 12), 0)
dbif.NotifyBlkDetailSuccess(gbt.GbtCtx, block_height, "", block.Nonce, int64(block.SubIdx))
dbif.NotifyBlkNewDb(gbt.GbtCtx, block_height, block.Hash, true, block.Nonce, int64(block.SubIdx))
updateSQL := `UPDATE blocks SET checked = 1 WHERE height = ? AND nonce = ? AND checked = 0`
_, err = db.Exec(updateSQL, block.Height, block.Nonce)
if err != nil {
//log.Printf("Error updating blk_new: %v", err)
logg.Error("[gbt]", zap.String("Error updating blk_new:", err.Error()))
continue
}
logg.Warn("[gbt]", zap.String("update block success:", fmt.Sprint(block.Height)+" "+block.Nonce))
}
}
func randomxJobId() string {
// 生成4个字节
bytes := make([]byte, 4)
_, err := rand.Read(bytes)
if err != nil {
panic(err)
}
// 转成 hex 字符串
hexStr := hex.EncodeToString(bytes)
return hexStr
}
var start_count int = 0
var start_height uint64 = 0
func gbt_running(gbt *GbtMoneroContext) {
ticker := time.NewTicker(1000 * time.Millisecond)
defer ticker.Stop()
for gbt.GbtCtx.Started {
select {
case <-ticker.C:
resp, err := gbt.GbtCtx.MoneroClinet.GetBlockTemplate(gbt.GbtCtx.MoneroAddr, 0)
if err != nil {
fmt.Println("调用失败:", err)
continue
}
if resp != nil {
if gbt.GbtCtx.PubCh == nil {
gbt.GbtCtx.PubCh = utility.InitZmqPub(gbt.GbtCtx.Config.Zmq.Pub)
}
if gbt.GbtCtx.PubCh != nil {
var responseJson GetBlockTemplateResponse
err = json.Unmarshal(resp, &responseJson)
if err != nil {
fmt.Println("[gbt]:get block template response to json error\n", err)
return
}
sendJobMonero := func(job msg.MoneroStratumJob) {
for trycnt := 0; trycnt < 3; trycnt++ {
job.JobId = randomxJobId()
bt, err := json.Marshal(job)
if err != nil {
fmt.Println(err)
return
}
err = gbt.GbtCtx.PubCh.SendMessage([][]byte{[]byte("jobmonero"), bt})
if err != nil {
if !gbt.GbtCtx.Started {
return
}
logg.Warn("[gbt]", zap.String("job", err.Error()))
} else {
logg.Warn("[gbt]", zap.String("job", "sent"))
start_height = job.Height
start_count = 0
break
}
}
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
}
if responseJson.Result.Height != start_height {
// 高度变化,先发 hashblock
if start_height != 0 {
topic := []byte("hashblock")
_msg := []byte("")
_ = gbt.GbtCtx.MoneroNewBlockPubCh.SendMessage([][]byte{topic, _msg})
}
// 再发 jobmonero
sendJobMonero(responseJson.Result)
} else if start_count >= 30 {
// 高度未变,但计数达到阈值,只发 jobmonero
sendJobMonero(responseJson.Result)
} else {
start_count += 1
}
} else {
logg.Warn("[gbt]", zap.String("job ", "sent failed! PubCh nil"))
}
} else {
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
}
case blkIdx := <-gbt.new_block_chan:
log.Println("new block chan", blkIdx)
update_block_confirm(gbt)
case <-gbt.GbtCtx.ExitGbtChan:
logg.Error("[gbt]", zap.String("gbt", "exit"))
return
}
}
}
func gbt_notify_running(gbt *GbtMoneroContext) {
for {
if !gbt.GbtCtx.Started {
break
}
if gbt.GbtCtx.MoneroNewBlockSubCh == nil {
gbt.GbtCtx.MoneroNewBlockSubCh = utility.InitZmqSub(gbt.GbtCtx.Config.Rpc.ZmqSub, utility.BITCOIND_ZMQ_HASHBLOCK)
}
if gbt.GbtCtx.MoneroNewBlockSubCh != nil {
// fmt.Println("gbt_notify_running 开始接收消息")
cmsg_sub, err := gbt.GbtCtx.MoneroNewBlockSubCh.RecvMessage()
if err != nil {
if !gbt.GbtCtx.Started {
break
}
gbt.GbtCtx.MoneroNewBlockSubCh.SetSubscribe(utility.BITCOIND_ZMQ_HASHBLOCK)
gbt.GbtCtx.MoneroNewBlockSubCh.Connect(gbt.GbtCtx.Config.Rpc.ZmqSub)
continue
}
if len(cmsg_sub) >= 2 {
if string(cmsg_sub[0]) == "hashblock" {
GbtMoneroCtx.new_block_index = GbtMoneroCtx.new_block_index + 1
//log.Println("gbt_notify_running", hex.EncodeToString(cmsg_sub[1]), GbtNexaCtx.new_block_index)
gbt.new_block_chan <- GbtMoneroCtx.new_block_index
}
}
} else {
logg.Error("[gbt]", zap.String("notify", "NodeSubCh fail!"))
time.Sleep(time.Duration(1) * time.Second)
}
}
}
func submit_block_running(block *GbtMoneroContext) {
logg.Info("[block]", zap.String("submit_block_running", "Start."))
for {
if !block.GbtCtx.Started {
break
}
if block.GbtCtx.SubCh == nil {
block.GbtCtx.SubCh = utility.InitZmqSub(block.GbtCtx.Config.Zmq.Sub, "blk"+block.GbtCtx.Coin)
}
if block.GbtCtx.SubCh != nil {
cmsg_sub, err := block.GbtCtx.SubCh.RecvMessage()
if err != nil {
if !block.GbtCtx.Started {
break
}
time.Sleep(time.Duration(1) * time.Second)
block.GbtCtx.SubCh.SetSubscribe("blk" + block.GbtCtx.Coin)
block.GbtCtx.SubCh.Connect(block.GbtCtx.Config.Zmq.Sub)
continue
}
if len(cmsg_sub) >= 2 {
if string(cmsg_sub[0]) == "blkmonero" {
cmsg := cmsg_sub[1]
//block data
msgb := make([]byte, len(cmsg)-16)
copy(msgb, cmsg)
var moneroblock msg.BlockMoneroMsg
if err := json.Unmarshal(msgb, &moneroblock); err != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg.Error("[block]", zap.String("failed to Unmarshal job", err.Error()))
continue
}
// heightb, err := hex.DecodeString(string(cmsg[len(msgb) : len(msgb)+8]))
// if err != nil {
// //block.Consumer.MarkOffset(cmsg, "")
// logg.Error("[block]", zap.String("failed to decode height", err.Error()))
// continue
// }
var height uint32 = uint32(moneroblock.Height)
logg.Warn("[block]", zap.Uint32("height", height))
if height <= block.last_height {
continue
}
block.last_height = height
indexb, err1 := hex.DecodeString(string(cmsg[len(msgb)+8:]))
if err1 != nil {
logg.Error("[block]", zap.String("failed to decode index", err1.Error()))
continue
}
var index uint32 = utility.ByteToUint32(indexb)
logg.Warn("[block]", zap.Uint32("index", index))
logg.Debug("[block]", zap.String("msg", string(cmsg)), zap.String("blk", string(msgb)))
result, _ := block.GbtCtx.MoneroClinet.SubmitBlock(moneroblock.Header)
var submitResp MoneroRPCResponse[SubmitSuccess]
if err2 := json.Unmarshal(result, &submitResp); err2 != nil {
logg.Error("[submit block]", zap.String("unmarshal error", err2.Error()))
return
}
if submitResp.Error != nil {
logg.Error("[submit block]", zap.String("submit failed reason", submitResp.Error.Message))
return
}
if submitResp.Result != nil {
logg.Info("[submit block]", zap.String("submit status", submitResp.Result.Status))
}
logg.Info("[block]", zap.String("result", string(result)))
//}
blockHash, success_msg := submitResp.Result.BlockId, submitResp.Result.Status
// nonceHex := fmt.Sprintf("%x", moneroblock.Nonce)
dbif.NotifyPoolBlkStatsSubmitResult(block.GbtCtx, int64(height), blockHash, success_msg, moneroblock.Nonce, moneroblock.SubIdx)
block.Submits += 1
//log.Printf("[block] height %d subidx %d nonce %s\n", height, nexablock.SubIdx, nexablock.Nonce)
logg.Warn("[block]", zap.Float64("total submits", block.Submits), zap.Int64("SubIdx", moneroblock.SubIdx))
// nonce, err := strconv.ParseUint(moneroblock.Nonce, 16, 32)
// if err != nil {
// return
// }
new_block_into_db(block, moneroblock.User, moneroblock.Miner, moneroblock.Index, int64(height), moneroblock.Nonce, blockHash, moneroblock.SubIdx)
}
}
} else {
logg.Error("[block]", zap.String("block", "SubCh failed! retry"))
time.Sleep(time.Duration(1) * time.Second)
}
}
}
func new_block_into_db(block *GbtMoneroContext, user string, miner string, minerid string, height int64, nonce string, hash string, subidx int64) bool {
db, err := sql.Open("sqlite3", "./blocks.db")
if err != nil {
log.Printf("Error opening database: %v", err)
return false
}
defer db.Close()
createTableSQL := `
CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user TEXT NOT NULL,
miner TEXT NOT NULL,
minerid TEXT NOT NULL,
height INTEGER,
nonce TEXT NOT NULL,
hash TEXT NOT NULL,
subidx INTEGER,
checked INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`
_, err = db.Exec(createTableSQL)
if err != nil {
log.Printf("Error creating table: %v", err)
return false
}
insertSQL := `INSERT INTO blocks (user, miner, minerid, height, nonce, checked, hash, subidx) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
_, err = db.Exec(insertSQL, user, miner, minerid, height, nonce, 0, hash, subidx)
if err != nil {
log.Printf("Error inserting data from blocks %s: %v", fmt.Sprint(height), err)
return false
}
return true
}