m2pool_core/internal/gbt/nexa/nexa.go

626 lines
19 KiB
Go

// nexa.go
package nexa
import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"sync/atomic"
//"pool/internal/cache"
"pool/internal/db"
"pool/internal/gbt/coin"
"pool/internal/gbt/dbif"
"pool/internal/msg"
"pool/internal/utility"
"time"
"database/sql"
//"github.com/btcsuite/btcd/rpcclient"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
)
const GBT_NEXA_VERSION string = "nexa v2.0m"
type NexaAddrConfig struct {
Addr string `json:"addr"`
}
type NexaConfig struct {
Nexa NexaAddrConfig `json:"nexa"`
}
type GbtNexaContext struct {
Config NexaConfig
GbtCtx *coin.GbtContext
last_time time.Time
last_gbt GbtNexaMsg
last_blockhash string
last_height uint32
Submits float64
addressIndex int
Target []byte
Header []byte
last_body string
new_block_chan chan int
new_block_index int
}
var logg *zap.Logger
var GbtNexaCtx GbtNexaContext
type GbtNexaMsg struct {
Id uint64 `json:"id"`
HeaderCommitment string `json:"headerCommitment"`
NBits string `json:"nBits"`
}
type GetBlockHeaderMsg struct {
Height int `json:"height"`
Nonce string `json:"nonce"`
Confirmations int `json:"confirmations"`
}
type GetBlockStatsMsg struct {
Height int `json:"height"`
Subsidy float64 `json:"subsidy"`
Totalfee float64 `json:"totalfee"`
}
type BlockCheckData struct {
Height int
Nonce string
User string
Miner string
MinerId string
Hash string
SubIdx int
}
/*type NewBlockMsg struct {
Height int `json:"height"`
Nonce int `json:"nonce"`
}*/
type PushBlkNewMsg struct {
Coin string `json:"coin"`
Height int `json:"height"`
Nonce string `json:"nonce"`
}
func update_block_confirm(gbt *GbtNexaContext) {
db, err := sql.Open("sqlite3", "./blocks.db")
if err != nil {
//log.Printf("Error opening database: %v", err)
logg.Error("[gbt]", zap.String("Error opening database", err.Error()))
return
}
defer db.Close()
query := "SELECT user,miner,minerid,height,nonce,hash,subidx FROM blocks WHERE checked=0 AND created_at >= datetime('now', '-30 minutes') order by id desc limit 2"
rows, err := db.Query(query)
if err != nil {
//log.Printf("Error executing query from blocks: %v", err)
logg.Error("[gbt]", zap.String("Error executing query from blocks:", err.Error()))
return
}
defer rows.Close()
var blocks []BlockCheckData
for rows.Next() {
var height int
var nonce string
var user string
var miner string
var minerid string
var hash string
var subidx int
if err := rows.Scan(&user, &miner, &minerid, &height, &nonce, &hash, &subidx); err != nil {
//log.Printf("Error scanning row in blocks: %v", err)
logg.Error("[gbt]", zap.String("Error scanning row in blocks:", err.Error()))
return
}
var blockdata BlockCheckData
blockdata.Height = height
blockdata.Nonce = nonce
blockdata.User = user
blockdata.Miner = miner
blockdata.MinerId = minerid
blockdata.Hash = hash
blockdata.SubIdx = subidx
blocks = append(blocks, blockdata)
//fmt.Printf("blocks - Height: %d, Nonce: %d\n", height, nonce)
//log.Printf("update block height %d nonce %s, subidx %d, user %s", height, nonce, subidx, user+"."+miner+"_"+minerid)
}
for _, block := range blocks {
block_hash, err := gbt.GbtCtx.Client.GetBlockHash(int64(block.Height))
if err != nil {
logg.Info("[gbt]", zap.String("GetBlockHash ", err.Error()))
continue
}
rawmsgs := make([]json.RawMessage, 1)
param_str := `"` + block_hash.String() + `"`
rawmsgs[0] = json.RawMessage(param_str)
result, err := gbt.GbtCtx.Client.RawRequest("getblockheader", rawmsgs)
if err != nil {
//log.Printf("getblockheader %s", err.Error())
logg.Error("[gbt]", zap.String("getblockheader", err.Error()))
continue
}
//log.Printf("getblockheader %d %s:%v", block.Height, block_hash, result)
var blockHeader msg.GetBlockHeaderMsg
err = json.Unmarshal(result, &blockHeader)
if err != nil {
//log.Printf("getblockheader Unmarshal %s", err.Error())
logg.Error("[gbt]", zap.String("getblockheader Unmarshal ", fmt.Sprint(block.Height)+" "+err.Error()))
continue
}
rawmsgs_stats := make([]json.RawMessage, 1)
rawmsgs_stats[0] = json.RawMessage(param_str)
result_stats, err := gbt.GbtCtx.Client.RawRequest("getblockstats", rawmsgs_stats)
if err != nil {
//log.Printf("getblockstats %s", err.Error())
logg.Error("[gbt]", zap.String("getblockstats", err.Error()))
continue
}
//log.Printf("getblockheader %d %s:%v", block.Height, block_hash, result)
var blockStats msg.GetBlockStatsMsg
err = json.Unmarshal(result_stats, &blockStats)
if err != nil {
//log.Printf("getblockstats Unmarshal %s", err.Error())
logg.Error("[gbt]", zap.String("getblockstats Unmarshal ", fmt.Sprint(block.Height)+" "+err.Error()))
continue
}
if blockHeader.Confirmations > 3 {
//log.Printf("cmp block: %d %s vs %s %s", block.Height, block.Nonce, string(result), blockHeader.Nonce)
//log.Printf("cmp block: %d %s %s_%s_%s vs %s\n", block.Height, block.Nonce, block.User, block.Miner, block.MinerId, blockHeader.Nonce)
if blockHeader.Nonce == block.Nonce {
block_height := int64(block.Height)
/*dbif.NotifyMinerSuccess(gbt.GbtCtx, block.User, block.Miner, block.MinerId, block_height, "", block.Nonce, int64(block.SubIdx), blockStats.Subsidy, blockStats.Totalfee)
dbif.NotifyUsersBlkStatsSuccess(gbt.GbtCtx, block.User, block_height, "", block.Nonce, int64(block.SubIdx), blockStats.Subsidy, blockStats.Totalfee)*/
dbif.NotifyPoolBlkStatsSuccess(gbt.GbtCtx, block_height, "", block.Nonce, int64(block.SubIdx), blockStats.Subsidy, blockStats.Totalfee)
dbif.NotifyBlkDetailSuccess(gbt.GbtCtx, block_height, "", block.Nonce, int64(block.SubIdx))
/*dbif.NotifyBlockStat(gbt.GbtCtx, block.User, block.Miner, block.MinerId, blockStats.Subsidy, blockStats.Totalfee)*/
//dbif.NotifyBlkNewSuccess(gbt.GbtCtx, block_height, "", block.Nonce, -1)
dbif.NotifyBlkNewDb(gbt.GbtCtx, block_height, block.Hash, true, block.Nonce, int64(block.SubIdx))
//gbt.GbtCtx.Blocks += 1
//cache.StorePoolCache(gbt.GbtCtx.RedisClient, gbt.GbtCtx.Coin, "blocks", gbt.GbtCtx.Blocks)
//gbt.GbtCtx.Reward += blockStats.Subsidy
//cache.StorePoolCache(gbt.GbtCtx.RedisClient, gbt.GbtCtx.Coin, "reward", gbt.GbtCtx.Reward)
//gbt.GbtCtx.Fee += blockStats.Totalfee
//cache.StorePoolCache(gbt.GbtCtx.RedisClient, gbt.GbtCtx.Coin, "fee", gbt.GbtCtx.Fee)
/*var pushmsg PushBlkNewMsg
pushmsg.Coin = gbt.GbtCtx.Coin
pushmsg.Height = block.Height
pushmsg.Nonce = block.Nonce
jsonData, err := json.Marshal(pushmsg)
if err != nil {
//log.Printf("Failed to marshal JSON: %v", err)
logg.Error("[gbt]", zap.String("Failed to marshal JSON:", err.Error()))
continue
}
if gbt.GbtCtx.PushCh == nil {
gbt.GbtCtx.PushCh = utility.InitZmqPush(gbt.GbtCtx.Config.Profit.Push)
}
if gbt.GbtCtx.PushCh != nil {
err = gbt.GbtCtx.PushCh.SendMessage([][]byte{[]byte("blk_new"), jsonData})
if err != nil {
//log.Printf("Failed to send data: %v", err)
logg.Error("[gbt]", zap.String("Failed to SendMessage:", err.Error()))
continue
}
}*/
updateSQL := `UPDATE blocks SET checked = 1 WHERE height = ? AND nonce = ? AND checked = 0`
_, err = db.Exec(updateSQL, block.Height, block.Nonce)
if err != nil {
//log.Printf("Error updating blk_new: %v", err)
logg.Error("[gbt]", zap.String("Error updating blk_new:", err.Error()))
continue
}
//log.Printf("update block success: %d %s", block.Height, block.Nonce)
logg.Warn("[gbt]", zap.String("update block success:", fmt.Sprint(block.Height)+" "+block.Nonce))
}
}
}
}
func get_gbt_msg(gbt *GbtNexaContext) []byte {
height, err := gbt.GbtCtx.Client.GetBlockCount()
if err != nil {
logg.Info("[gbt]", zap.String("GetBlockCount ", err.Error()))
return nil
}
height = height + 1
rawmsgs := make([]json.RawMessage, 3)
param_str1 := `"` + "1000" + `"`
param_str2 := `"` + gbt.Config.Nexa.Addr + `"`
if len(gbt.GbtCtx.MinerAddrs) > 0 {
param_str2 = `"` + gbt.GbtCtx.MinerAddrs[gbt.GbtCtx.MinerAddrIndex] + `"`
gbt.GbtCtx.MinerAddrIndex = gbt.GbtCtx.MinerAddrIndex + 1
if gbt.GbtCtx.MinerAddrIndex >= len(gbt.GbtCtx.MinerAddrs) {
gbt.GbtCtx.MinerAddrIndex = 0
}
}
param_str3 := `"` + " / m2pool.com / " + `"`
//logg.Debug("[gbt]", zap.String("option", param_str))
rawmsgs[0] = json.RawMessage(param_str1)
rawmsgs[1] = json.RawMessage(param_str2)
rawmsgs[2] = json.RawMessage(param_str3)
result, err := gbt.GbtCtx.Client.RawRequest("getminingcandidate", rawmsgs) // 获得挖矿数据
//log.Printf("[gbt] getminingcandidate %v %s\n", err, result)
if err != nil {
logg.Error("[gbt]", zap.String("getminingcandidate", err.Error()))
return nil
}
var rxmsg GbtNexaMsg // 获取任务的结构
err = json.Unmarshal(result, &rxmsg)
if err != nil {
logg.Error("[gbt]", zap.String("getminingcandidate", err.Error()))
return nil
}
gbtstr := fmt.Sprintf("[gbt] height %d, id %d, header %s, nBits %s", height, rxmsg.Id, rxmsg.HeaderCommitment, rxmsg.NBits)
//logg.Debug("[gbt]", zap.String(" ", gbtstr))
if rxmsg.Id == gbt.last_gbt.Id {
return nil
}
gbt.last_gbt = rxmsg
gbt.last_time = time.Now()
nbits_b, _ := hex.DecodeString(rxmsg.NBits)
var nbits_i uint32 = binary.BigEndian.Uint32(nbits_b)
bigdiff := utility.CompactToBig(nbits_i)
targetdiff := fmt.Sprintf("%064x", bigdiff.Bytes())
//logg.Debug("[gbt]", zap.String("target", targetdiff))
logg.Debug("[gbt]", zap.String(" ", gbtstr), zap.String("target", targetdiff))
var job msg.NexaStratumJob
job.Id = rxmsg.Id
job.Header = utility.Reverse_string(rxmsg.HeaderCommitment)
job.NBits = rxmsg.NBits
job.CurTime = uint64(time.Now().Unix())
job.Height = uint32(height)
job.Nonce = ""
job.Target = targetdiff
job.Extranonce1 = ""
job.Extranonce2_size = 8
job.Extranonce2 = ""
body, err := json.Marshal(job)
if err != nil {
logg.Error("[gbt]", zap.String("failed to Marshal jobmsg", err.Error()))
return nil
}
return body
}
func gbt_notify_running(gbt *GbtNexaContext) {
for {
if !gbt.GbtCtx.Started {
break
}
if gbt.GbtCtx.NodeSubCh == nil {
gbt.GbtCtx.NodeSubCh = utility.InitZmqSub(gbt.GbtCtx.Config.Rpc.ZmqSub, utility.BITCOIND_ZMQ_HASHBLOCK)
}
if gbt.GbtCtx.NodeSubCh != nil {
cmsg_sub, err := gbt.GbtCtx.NodeSubCh.RecvMessage()
if err != nil {
if !gbt.GbtCtx.Started {
break
}
gbt.GbtCtx.NodeSubCh.SetSubscribe(utility.BITCOIND_ZMQ_HASHBLOCK)
gbt.GbtCtx.NodeSubCh.Connect(gbt.GbtCtx.Config.Rpc.ZmqSub)
continue
}
if len(cmsg_sub) >= 2 {
if string(cmsg_sub[0]) == "hashblock" {
GbtNexaCtx.new_block_index = GbtNexaCtx.new_block_index + 1
//log.Println("gbt_notify_running", hex.EncodeToString(cmsg_sub[1]), GbtNexaCtx.new_block_index)
gbt.new_block_chan <- GbtNexaCtx.new_block_index
}
}
} else {
logg.Error("[gbt]", zap.String("notify", "NodeSubCh fail!"))
time.Sleep(time.Duration(1) * time.Second)
}
}
}
func gbt_running(gbt *GbtNexaContext) {
gbtmsg := get_gbt_msg(gbt)
if gbtmsg != nil {
if gbt.GbtCtx.PubCh == nil {
gbt.GbtCtx.PubCh = utility.InitZmqPub(gbt.GbtCtx.Config.Zmq.Pub)
}
if gbt.GbtCtx.PubCh != nil {
for trycnt := 0; trycnt < 3; trycnt++ {
err := gbt.GbtCtx.PubCh.SendMessage([][]byte{[]byte("jobnexa"), gbtmsg})
if err != nil {
if !gbt.GbtCtx.Started {
return
}
//gbt.GbtCtx.PubCh.Bind(gbt.GbtCtx.Config.Zmq.Pub)
logg.Warn("[gbt]", zap.String("job ", err.Error()))
} else {
//gbt.GbtCtx.PubCh.SendChan <- [][]byte{[]byte("jobnexa"), gbtmsg}
logg.Warn("[gbt]", zap.String("job ", "sent"))
break
}
}
//gbt.GbtCtx.AlivingChan <- true
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
} else {
logg.Warn("[gbt]", zap.String("job ", "sent failed! PubCh nil"))
}
} else {
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
}
timer := time.NewTimer(time.Duration(gbt.GbtCtx.Config.Rpc.Timeout) * time.Millisecond)
defer timer.Stop()
for {
if !gbt.GbtCtx.Started {
break
}
new_block_notify := false
select {
case blk_idx := <-gbt.new_block_chan:
log.Println("new block chan", blk_idx)
new_block_notify = true
if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Duration(gbt.GbtCtx.Config.Rpc.Timeout) * time.Millisecond)
case <-gbt.GbtCtx.ExitGbtChan:
logg.Error("[gbt]", zap.String("gbt", "exit"))
return
case <-time.After(time.Duration(gbt.GbtCtx.Config.Rpc.Timeout) * time.Millisecond):
log.Println("poll gbt timeout")
timer.Reset(time.Duration(gbt.GbtCtx.Config.Rpc.Timeout) * time.Millisecond)
}
/*if check_bestblockhash(gbt) {*/
gbtmsg := get_gbt_msg(gbt)
if gbtmsg != nil {
//check_preblock(gbt, DbCtx)
if gbt.GbtCtx.PubCh == nil {
gbt.GbtCtx.PubCh = utility.InitZmqPub(gbt.GbtCtx.Config.Zmq.Pub)
}
if gbt.GbtCtx.PubCh != nil {
for trycnt := 0; trycnt < 3; trycnt++ {
err := gbt.GbtCtx.PubCh.SendMessage([][]byte{[]byte("jobnexa"), gbtmsg})
if err != nil {
logg.Warn("[gbt]", zap.String("job ", err.Error()))
continue
} else {
//gbt.GbtCtx.PubCh.SendChan <- [][]byte{[]byte("jobnexa"), gbtmsg}
logg.Warn("[gbt]", zap.String("job ", "sent"))
break
}
}
//gbt.GbtCtx.AlivingChan <- true
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
} else {
logg.Warn("[gbt]", zap.String("job ", "sent failed! PubCh nil"))
}
} else {
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
}
if new_block_notify {
update_block_confirm(gbt)
}
}
}
func nexaInit(config *NexaConfig) {
data, err := ioutil.ReadFile("gbt.conf")
if err != nil {
panic(err.Error())
}
if err = json.Unmarshal(data, &config); err != nil {
panic(err.Error())
}
}
func Init(GbtCtx *coin.GbtContext, DbCtx *db.DbContext) {
GbtNexaCtx.GbtCtx = GbtCtx
GbtNexaCtx.last_height = 0
nexaInit(&GbtNexaCtx.Config)
GbtNexaCtx.Target = make([]byte, 32)
GbtNexaCtx.Header = make([]byte, 49)
GbtNexaCtx.last_time = time.Now()
logg = GbtCtx.Log
GbtNexaCtx.new_block_chan = make(chan int, 256)
GbtNexaCtx.new_block_index = 0
logg.Info("[gbt]", zap.String("gbt_nexa_version", GBT_NEXA_VERSION))
rawmsgs := make([]json.RawMessage, 1)
pool_tag := " / m2pool.com / "
param_str := `"` + pool_tag + `"`
rawmsgs[0] = json.RawMessage(param_str)
_, err := GbtNexaCtx.GbtCtx.Client.RawRequest("setminercomment", rawmsgs)
if err != nil {
//log.Printf("setminercomment %s", err.Error())
logg.Error("[gbt]", zap.String("setminercomment", err.Error()))
}
}
func Start() {
go gbt_running(&GbtNexaCtx)
go gbt_notify_running(&GbtNexaCtx)
go submit_block_running(&GbtNexaCtx)
}
func Stop() {
defer close(GbtNexaCtx.new_block_chan)
}
func new_block_into_db(block *GbtNexaContext, user string, miner string, minerid string, height int64, nonce string, hash string, subidx int64) bool {
db, err := sql.Open("sqlite3", "./blocks.db")
if err != nil {
log.Printf("Error opening database: %v", err)
return false
}
defer db.Close()
createTableSQL := `
CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user TEXT NOT NULL,
miner TEXT NOT NULL,
minerid TEXT NOT NULL,
height INTEGER,
nonce TEXT NOT NULL,
hash TEXT NOT NULL,
subidx INTEGER,
checked INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`
_, err = db.Exec(createTableSQL)
if err != nil {
log.Printf("Error creating table: %v", err)
return false
}
insertSQL := `INSERT INTO blocks (user, miner, minerid, height, nonce, checked, hash, subidx) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
_, err = db.Exec(insertSQL, user, miner, minerid, height, nonce, 0, hash, subidx)
if err != nil {
log.Printf("Error inserting data from blocks %s: %v", fmt.Sprint(height), err)
return false
}
return true
}
func submit_block_running(block *GbtNexaContext) {
logg.Info("[block]", zap.String("submit_block_running", "Start."))
for {
if !block.GbtCtx.Started {
break
}
if block.GbtCtx.SubCh == nil {
block.GbtCtx.SubCh = utility.InitZmqSub(block.GbtCtx.Config.Zmq.Sub, "blk"+block.GbtCtx.Coin)
}
if block.GbtCtx.SubCh != nil {
cmsg_sub, err := block.GbtCtx.SubCh.RecvMessage()
if err != nil {
if !block.GbtCtx.Started {
break
}
time.Sleep(time.Duration(1) * time.Second)
/*block.GbtCtx.SubCh.Destroy()
block.GbtCtx.SubCh = nil*/
block.GbtCtx.SubCh.SetSubscribe("blk" + block.GbtCtx.Coin)
block.GbtCtx.SubCh.Connect(block.GbtCtx.Config.Zmq.Sub)
continue
}
//log.Println(cmsg_sub, len(cmsg_sub), block.GbtCtx.SubCh)
if len(cmsg_sub) >= 2 {
if string(cmsg_sub[0]) == "blknexa" {
cmsg := cmsg_sub[1]
//block data
msgb := make([]byte, len(cmsg)-16)
copy(msgb, cmsg)
//height
//heightb := make([]byte, 4)
heightb, err := hex.DecodeString(string(cmsg[len(msgb) : len(msgb)+8]))
if err != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg.Error("[block]", zap.String("failed to decode height", err.Error()))
continue
}
var height uint32 = utility.ByteToUint32(heightb)
logg.Warn("[block]", zap.Uint32("height", height))
if height <= block.last_height {
continue
}
block.last_height = height
//index
//indexb := make([]byte, 4)
indexb, err1 := hex.DecodeString(string(cmsg[len(msgb)+8:]))
if err1 != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg.Error("[block]", zap.String("failed to decode index", err1.Error()))
continue
}
//copy(indexb, cmsg.Value[len(msgb)+4:])
var index uint32 = utility.ByteToUint32(indexb)
logg.Warn("[block]", zap.Uint32("index", index))
logg.Debug("[block]", zap.String("msg", string(cmsg)), zap.String("blk", string(msgb)))
var nexablock msg.BlockNexaMsg
if err := json.Unmarshal(msgb, &nexablock); err != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg.Error("[block]", zap.String("failed to Unmarshal job", err.Error()))
continue
}
blk := fmt.Sprintf(`{"id":%d,"nonce":"%s"}`, nexablock.Id, nexablock.Nonce)
rawmsgs := make([]json.RawMessage, 1)
logg.Info("[block]", zap.String("blk", blk))
rawmsgs[0] = json.RawMessage(blk)
//var last_result json.RawMessage
//for i := 0; i < len(block.Client); i++ {
result, err := block.GbtCtx.Client.RawRequest("submitminingsolution", rawmsgs)
if err != nil {
logg.Error("[block]", zap.String("submitminingsolution", err.Error()))
} else {
//last_result = result
}
logg.Info("[block]", zap.String("result", string(result)))
//}
/*dbif.NotifyMinerSubmitResult(block.GbtCtx, nexablock.User, nexablock.Miner, nexablock.Index, int64(height), nexablock.Pow, string(result), nexablock.Nonce, nexablock.SubIdx)
dbif.NotifyUsersBlkStatsSubmitResult(block.GbtCtx, nexablock.User, int64(height), nexablock.Pow, string(result), nexablock.Nonce, nexablock.SubIdx)*/
dbif.NotifyPoolBlkStatsSubmitResult(block.GbtCtx, int64(height), nexablock.Pow, string(result), nexablock.Nonce, nexablock.SubIdx)
block.Submits += 1
//log.Printf("[block] height %d subidx %d nonce %s\n", height, nexablock.SubIdx, nexablock.Nonce)
logg.Warn("[block]", zap.Float64("total submits", block.Submits), zap.Int64("SubIdx", nexablock.SubIdx))
new_block_into_db(block, nexablock.User, nexablock.Miner, nexablock.Index, int64(height), nexablock.Nonce, nexablock.Pow, nexablock.SubIdx)
}
}
} else {
logg.Error("[block]", zap.String("block", "SubCh failed! retry"))
time.Sleep(time.Duration(1) * time.Second)
}
}
}