m2pool_core/internal/gbt/alph/alph.go

809 lines
23 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package alph
import (
"bytes"
"database/sql"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net"
"strconv"
"sync"
"sync/atomic"
//"pool/internal/cache"
"pool/internal/db"
"pool/internal/gbt/alph/constants"
"pool/internal/gbt/coin"
"pool/internal/gbt/dbif"
"pool/internal/msg"
"pool/internal/utility"
"time"
//"github.com/btcsuite/btcd/rpcclient"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
)
const GBT_ALPH_VERSION = "alph v1.0"
const headerSize = 4
type AlphAddrConfig struct {
Addr string `json:"addr"`
}
type AlphConfig struct {
Alph AlphAddrConfig `json:"alph"`
}
type GbtAlphContext struct {
Config AlphConfig
GbtCtx *coin.GbtContext
last_time time.Time
last_gbt BlockAlphMsg
last_blockhash string
last_height uint32
Submits float64
addressIndex int
Target []byte
Header []byte
last_body string
ChainIndex int // 分片
new_block_chan chan int
new_block_index int
}
type BlockAlphMsg struct {
Jobs []AlphBlockMsg
}
type AlphBlockMsg struct {
JobId string
FromGroup uint32
ToGroup uint32
Height uint32
HeaderBlob string
TargetBlob string
TxsBlob string
}
type ReceiveJob struct {
FromGroup uint32
ToGroup uint32
HeaderBlobLength uint32
HeaderBlob []byte
TxsBlobLength uint32
TxsBlob []byte
TargetLength uint32
TargetBlob []byte
Height uint32
DataLength uint32
}
type JobCounter struct {
counter uint64
mu sync.Mutex
}
var logg *zap.Logger
var GbtAlphCtx GbtAlphContext
type GbtAlphMsg struct {
Id uint64 `json:"id"`
HeaderCommitment string `json:"headerCommitment"` // 获取到的任务的区块头
NBits string `json:"nBits"`
}
type GetBlockHeaderMsg struct {
Height int `json:"height"`
Nonce string `json:"nonce"`
Confirmations int `json:"confirmations"`
}
type GetBlockStatsMsg struct {
Height int `json:"height"`
Subsidy float64 `json:"subsidy"`
Totalfee float64 `json:"totalfee"`
}
type BlockCheckData struct {
FromGroup uint32
ToGroup uint32
Height int
Nonce string
User string
Miner string
MinerId string
Hash string
SubIdx int
}
type PushBlkNewMsg struct {
Coin string `json:"coin"`
Height int `json:"height"`
Nonce string `json:"nonce"`
}
var addr = []string{
"1CYKPymfTVex9KZ2i48S3v5cAE7xT6hERG1P6GJiHgWJu",
"14eEDF5SvnYcz12Cmkn9UJHLiiTKcqvaP2JgbsSevc38H",
"1HLvickKHvsFziGqmZt7hcithAtRkgueUBHXRHBvMSp98",
"1JBHWv4XPbagWxnC9HXimz67MY85KoKz3h16uvk3cJcAS",
}
func stringToFloat64(str string) float64 {
floatValue, err := strconv.ParseFloat(str, 64)
if err != nil {
fmt.Println("转换失败:", err)
return 0
}
return floatValue
}
func savePoint4(num float64) float64 {
result := fmt.Sprintf("%.4f", num)
return stringToFloat64(result)
}
func update_block_confirm(gbt *GbtAlphContext) {
// 1,每次报块都调用本函数
// 2,先查出报块时的上两个块,然后查询这些快是否在主链上(通过区块的hash传入CheckBlk函数)
// 3,通过查询区块信息确定区块的nonce、height、奖励通过链信息查询出当前链高度
db, err := sql.Open("sqlite3", "./blocks.db")
if err != nil {
//log.Printf("Error opening database: %v", err)
logg.Error("[gbt]", zap.String("Error opening database", err.Error()))
return
}
defer db.Close()
query := "SELECT fromgroup, togroup, user, miner, minerid, height, nonce, hash, subidx FROM blocks WHERE checked=0 AND created_at >= datetime('now', '-30 minutes') order by id desc limit 1"
rows, err := db.Query(query)
if err != nil {
//log.Printf("Error executing query from blocks: %v", err)
logg.Error("[gbt]", zap.String("Error executing query from blocks:", err.Error()))
return
}
defer rows.Close()
var blocks []BlockCheckData
for rows.Next() {
var fromGroup uint32
var toGroup uint32
var height int
var user string
var miner string
var minerid string
var hash string
var subidx int
var nonce string
if err := rows.Scan(&fromGroup, &toGroup, &user, &miner, &minerid, &height, &nonce, &hash, &subidx); err != nil {
//log.Printf("Error scanning row in blocks: %v", err)
logg.Error("[gbt]", zap.String("Error scanning row in blocks:", err.Error()))
return
}
var blockdata BlockCheckData
blockdata.FromGroup = fromGroup
blockdata.ToGroup = toGroup
blockdata.Height = height
blockdata.User = user
blockdata.Miner = miner
blockdata.MinerId = minerid
blockdata.Hash = hash
blockdata.SubIdx = subidx
blockdata.Nonce = nonce
blocks = append(blocks, blockdata)
}
fmt.Println("blocks:", blocks)
for _, block := range blocks {
blockHash, err := gbt.GbtCtx.ClientAlphApi.GetBlockHash(block.FromGroup, block.ToGroup, uint32(block.Height))
if err != nil {
logg.Info("[gbt]", zap.String("GetBlockHash ", err.Error()))
continue
}
if blockHash == block.Hash {
checkResult := gbt.GbtCtx.ClientAlphApi.CheckBlk(blockHash)
if checkResult {
blockInfo := gbt.GbtCtx.ClientAlphApi.GetBlcokInfo(blockHash)
var total_amount float64 = 0
fromGroup, toGroup, nonce, height := blockInfo.ChainFrom, blockInfo.ChainTo, blockInfo.Nonce, blockInfo.Height
// for _, tx := range blockInfo.Transactions {
for _, out := range blockInfo.Transactions[len(blockInfo.Transactions)-1].Unsigned.FixedOutputs {
if out.Address == addr[fromGroup] && nonce == blockInfo.Nonce {
amount := out.AttoAlphAmount
total_amount += savePoint4(stringToFloat64(amount) / math.Pow(10, 18))
fmt.Println(fromGroup, "->", toGroup, "(", height, ")", total_amount, ",", nonce, " ", blockHash, " ", out.Address, " 报块成功")
} else {
fmt.Println(fromGroup, "->", toGroup, "(", height, ")", total_amount, ",", nonce, "不是报块,实际报块地址为:", out.Address)
}
}
// }
block_height := height // 用区块链返回的height防止某个块成为下一个高度的叔块导致高度变化
dbif.AlphNotifyPoolBlkStatsSuccess(gbt.GbtCtx, uint32(fromGroup), uint32(toGroup), block_height, "", block.Nonce, int64(block.SubIdx), total_amount, 0)
dbif.NotifyAlphBlkDetailSuccess(gbt.GbtCtx, uint32(fromGroup), uint32(toGroup), block_height, "", block.Nonce, int64(block.SubIdx))
dbif.NotifyAlphBlkNewDb(gbt.GbtCtx, uint32(fromGroup), uint32(toGroup), block_height, block.Hash, true, block.Nonce, int64(block.SubIdx))
updateSQL := `UPDATE blocks SET checked = 1 WHERE height = ? AND nonce = ? AND checked = 0`
_, err = db.Exec(updateSQL, block.Height, block.Nonce)
if err != nil {
logg.Error("[gbt]", zap.String("Error updating blk_new:", err.Error()))
continue
}
logg.Warn("[gbt]", zap.String("update block success:", fmt.Sprint(block.Height)+" "+block.Nonce))
}
} else {
logg.Info("[gbt]", zap.String("GetBlockHash ", "区块hash不在主链上"))
}
}
}
func NewAlphPool(host string, port string) (*net.Conn, error) {
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", host, port))
if err != nil {
return nil, fmt.Errorf("failed to connect to mining pool: %w", err)
}
return &conn, nil
}
func processJob(data []byte) (*ReceiveJob, error) {
reader := bytes.NewReader(data)
var job ReceiveJob
// Read fixed fields
fields := []interface{}{
&job.FromGroup, &job.ToGroup, &job.HeaderBlobLength,
}
for _, field := range fields {
if err := binary.Read(reader, binary.BigEndian, field); err != nil {
return nil, fmt.Errorf("failed to read fixed field: %w", err)
}
}
// Read variable-length fields
job.HeaderBlob = make([]byte, job.HeaderBlobLength)
if _, err := io.ReadFull(reader, job.HeaderBlob); err != nil {
return nil, fmt.Errorf("failed to read HeaderBlob: %w", err)
}
fields = []interface{}{
&job.TxsBlobLength,
}
for _, field := range fields {
if err := binary.Read(reader, binary.BigEndian, field); err != nil {
return nil, fmt.Errorf("failed to read fixed field: %w", err)
}
}
job.TxsBlob = make([]byte, job.TxsBlobLength)
if _, err := io.ReadFull(reader, job.TxsBlob); err != nil {
return nil, fmt.Errorf("failed to read TxsBlob: %w", err)
}
fields = []interface{}{
&job.TargetLength,
}
for _, field := range fields {
if err := binary.Read(reader, binary.BigEndian, field); err != nil {
return nil, fmt.Errorf("failed to read fixed field: %w", err)
}
}
job.TargetBlob = make([]byte, job.TargetLength)
if _, err := io.ReadFull(reader, job.TargetBlob); err != nil {
return nil, fmt.Errorf("failed to read TargetBlob: %w", err)
}
if err := binary.Read(reader, binary.BigEndian, &job.Height); err != nil {
return nil, fmt.Errorf("failed to read Height: %w", err)
}
job.DataLength = uint32(len(data) - reader.Len())
return &job, nil
}
func _parseJobs(message []byte) ([]ReceiveJob, error) {
jobCount := binary.BigEndian.Uint32(message[:4])
var jobs []ReceiveJob
offset := 4
for i := uint32(0); i < jobCount; i++ {
if offset >= len(message) {
return nil, fmt.Errorf("insufficient data for job parsing")
}
job, err := processJob(message[offset:])
if err != nil {
return nil, fmt.Errorf("error processing job: %w", err)
}
jobs = append(jobs, *job)
offset += int(job.DataLength)
}
return jobs, nil
}
type SubmitResult struct {
FromGroup uint32
ToGroup uint32
BlockHash []byte
Successed bool
}
func _parseSubmitResult(buffer []byte) (SubmitResult, error) {
if len(buffer) < 41 {
return SubmitResult{}, fmt.Errorf("buffer length is too short: %d", len(buffer))
}
fromGroup := binary.BigEndian.Uint32(buffer[0:4])
toGroup := binary.BigEndian.Uint32(buffer[4:8])
blockHash := buffer[8:40]
successed := buffer[40] == 1
new_block_notify := successed
if new_block_notify {
update_block_confirm(&GbtAlphCtx)
}
// fmt.Println("节点返回的hash", blockHash)
// fmt.Println("节点返回的结果:", successed)
return SubmitResult{
FromGroup: fromGroup,
ToGroup: toGroup,
BlockHash: blockHash,
Successed: successed,
}, nil
}
func ParseJobs(data []byte) (interface{}, error) {
if len(data) < headerSize+1 {
return nil, fmt.Errorf("data too short to contain a valid job")
}
reader := bytes.NewReader(data)
var bodyLength uint32
if err := binary.Read(reader, binary.BigEndian, &bodyLength); err != nil {
return nil, fmt.Errorf("failed to read body length: %w", err)
}
version := uint8(data[headerSize])
if version != constants.MiningProtocolVersion {
return nil, fmt.Errorf("通信协议版本号错误!")
}
messageType := uint8(data[headerSize+1])
// statOffset := headerSize + 2
// endOffset := headerSize + bodyLength
message := data[headerSize+2 : headerSize+bodyLength] // startOffset : endOffset
if len(message) < 4 {
return nil, fmt.Errorf("message too short to parse job size")
}
var result interface{}
var err error
if messageType == constants.JobsMessageType {
result, err = _parseJobs(message)
if err != nil {
return nil, fmt.Errorf("解析任务失败: %w", err)
}
return result.([]ReceiveJob), nil
} else if messageType == constants.SubmitResultMessageType {
result, err = _parseSubmitResult(message)
if err != nil {
return nil, fmt.Errorf("解析提交结果失败: %w", err)
}
return result.(SubmitResult), nil
} else {
return nil, fmt.Errorf("未知消息类型: %d", messageType)
}
}
func NewJobCounter() *JobCounter {
return &JobCounter{}
}
// JobCounter 增加计数器并返回当前计数器值的十六进制表示
func (jc *JobCounter) Next() string {
jc.mu.Lock()
defer jc.mu.Unlock()
// 增加计数器
jc.counter++
// 当计数器达到 65535 时重置为 1
if jc.counter >= 0xFFFF {
jc.counter = 1
}
return jc.Current()
}
// 返回当前计数器值的十六进制表示
func (jc *JobCounter) Current() string {
return fmt.Sprintf("%x", jc.counter)
}
var jobCounter = NewJobCounter() // 创建一个全局唯一的JobCounter实例
func gbt_running(gbt *GbtAlphContext) {
fmt.Println("gbt_running started")
var lastBuffer []byte
var buffer = make([]byte, 5800)
for {
if !gbt.GbtCtx.Started {
break
}
n, err := (*gbt.GbtCtx.ClientAlph).Read(buffer)
if err != nil {
logg.Error("Error reading message", zap.Error(err))
break
}
message := append(lastBuffer, buffer[:n]...)
for len(message) >= headerSize {
if len(message) < headerSize {
break
}
bodyLength := binary.BigEndian.Uint32(message[:headerSize])
totalLength := headerSize + int(bodyLength)
if len(message) < totalLength {
break
}
completeMessage := message[:totalLength]
processMessage(completeMessage, gbt)
message = message[totalLength:]
}
lastBuffer = message
}
}
func get_gbt_msg(gbt *GbtAlphContext, gbtmsg []byte) {
if !gbt.GbtCtx.Started {
return
}
// new_block_notify := false
if gbtmsg != nil {
//check_preblock(gbt, DbCtx)
if gbt.GbtCtx.PubCh == nil {
gbt.GbtCtx.PubCh = utility.InitZmqPub(gbt.GbtCtx.Config.Zmq.Pub)
}
if gbt.GbtCtx.PubCh != nil {
for trycnt := 0; trycnt < 3; trycnt++ {
err := gbt.GbtCtx.PubCh.SendMessage([][]byte{[]byte("jobalph"), gbtmsg})
if err != nil {
logg.Warn("[gbt]", zap.String("job ", err.Error()))
continue
} else {
//gbt.GbtCtx.PubCh.SendChan <- [][]byte{[]byte("jobnexa"), gbtmsg}
logg.Warn("[gbt]", zap.String("job ", "sent"))
break
}
}
//gbt.GbtCtx.AlivingChan <- true
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
} else {
logg.Warn("[gbt]", zap.String("job ", "sent failed! PubCh nil"))
}
} else {
atomic.StoreInt32(&(gbt.GbtCtx.FlagAliving), 1)
}
// if new_block_notify {
// update_block_confirm(gbt)
// }
// }
}
// 重置定时器
// func resetTimer(timer *time.Timer, timeout int) {
// if !timer.Stop() {
// <-timer.C
// }
// timer.Reset(time.Duration(timeout) * time.Millisecond)
// }
// 处理接收到的完整消息
func processMessage(message []byte, gbt *GbtAlphContext) {
jobs, err := ParseJobs(message)
if err != nil {
logg.Error("Error parsing jobs", zap.Error(err))
return
}
switch job := jobs.(type) {
case []ReceiveJob:
handleReceiveJobs(job, gbt)
case SubmitResult:
fmt.Println("job type", job.Successed)
handleSubmitResult(job)
default:
logg.Warn("Unexpected message type")
}
}
// var jobIndex int = 0
// 处理接收到的任务
func handleReceiveJobs(jobs []ReceiveJob, gbt *GbtAlphContext) {
// for _, job := range jobs {
jobId := jobCounter.Next()
// block := AlphBlockMsg{
// JobId: jobId,
// FromGroup: job.FromGroup,
// ToGroup: job.ToGroup,
// Height: job.Height,
// Header: fmt.Sprintf("%x", job.HeaderBlob),
// Target: fmt.Sprintf("%x", job.TargetBlob),
// Txs: fmt.Sprintf("%x", job.TxsBlob),
// }
// alphJobs.Jobs = append(alphJobs.Jobs, block)
// }
// index := rand.Intn(16)
// 0-0 0-1 0-2 0-3 | 0 1 2 3
// 1-0 1-1 1-2 1-3 | 4 5 6 7
// 2-0 2-1 2-2 2-3 | 8 9 10 11
// 3-0 3-1 3-2 3-3 | 12 13 14 15
index := 0
jobId, FromGroup, ToGroup, Height, Header, Target, Txs := jobId, jobs[index].FromGroup, jobs[index].ToGroup, jobs[index].Height, jobs[index].HeaderBlob, jobs[index].TargetBlob, jobs[index].TxsBlob
job := AlphBlockMsg{
JobId: jobId,
FromGroup: FromGroup,
ToGroup: ToGroup,
Height: Height,
HeaderBlob: fmt.Sprintf("%x", Header),
TargetBlob: fmt.Sprintf("%x", Target),
TxsBlob: fmt.Sprintf("%x", Txs),
}
// fmt.Println("接到新任务:", job.JobId, job.TargetBlob)
data, err := json.Marshal(job)
if err != nil {
logg.Error("Error marshaling job", zap.Error(err))
return
}
// jobIndex += 1
// fmt.Println("gbt获取任务时间", time.Now(), "\t", "gbt获取任务序号", jobIndex)
get_gbt_msg(gbt, data)
// }
}
// 处理提交区块的结果
func handleSubmitResult(result SubmitResult) {
fromGroup, toGroup := result.FromGroup, result.ToGroup
hash := utility.BytesToHexStr(result.BlockHash)
chainIndex := utility.ChainIndexStr(fromGroup, toGroup)
// fmt.Println("报块上链:", chainIndex, "", hash)
if result.Successed {
logg.Info("Block submitted successfully", zap.String("hash", hash), zap.String("chainIndex", chainIndex))
} else {
logg.Error("Block submission failed", zap.String("hash", hash), zap.String("chainIndex", chainIndex))
}
}
func gbt_notify_running(gbt *GbtAlphContext) {
for {
if !gbt.GbtCtx.Started {
break
}
if gbt.GbtCtx.NodeSubCh == nil {
gbt.GbtCtx.NodeSubCh = utility.InitZmqSub(gbt.GbtCtx.Config.Rpc.ZmqSub, utility.BITCOIND_ZMQ_HASHBLOCK)
}
if gbt.GbtCtx.NodeSubCh != nil {
cmsg_sub, err := gbt.GbtCtx.NodeSubCh.RecvMessage()
if err != nil {
if !gbt.GbtCtx.Started {
break
}
gbt.GbtCtx.NodeSubCh.SetSubscribe(utility.BITCOIND_ZMQ_HASHBLOCK)
gbt.GbtCtx.NodeSubCh.Connect(gbt.GbtCtx.Config.Rpc.ZmqSub)
continue
}
if len(cmsg_sub) >= 2 {
if string(cmsg_sub[0]) == "hashblock" {
GbtAlphCtx.new_block_index = GbtAlphCtx.new_block_index + 1
gbt.new_block_chan <- GbtAlphCtx.new_block_index
}
}
} else {
logg.Error("[gbt]", zap.String("notify", "NodeSubCh fail!"))
time.Sleep(time.Duration(1) * time.Second)
}
}
}
func alphInit(config *AlphConfig) {
data, err := ioutil.ReadFile("gbt.conf")
if err != nil {
panic(err.Error())
}
if err = json.Unmarshal(data, &config); err != nil {
panic(err.Error())
}
}
func Init(GbtCtx *coin.GbtContext, DbCtx *db.DbContext) {
GbtAlphCtx.GbtCtx = GbtCtx
GbtAlphCtx.last_height = 0
alphInit(&GbtAlphCtx.Config)
GbtAlphCtx.Target = make([]byte, 32)
GbtAlphCtx.Header = make([]byte, 49)
GbtAlphCtx.last_time = time.Now()
logg = GbtCtx.Log
GbtAlphCtx.new_block_chan = make(chan int, 256)
GbtAlphCtx.new_block_index = 0
}
func Start() {
go gbt_running(&GbtAlphCtx)
// go gbt_notify_running(&GbtAlphCtx)
go submit_block_running(&GbtAlphCtx)
}
func Stop() {
defer close(GbtAlphCtx.new_block_chan)
}
func new_block_into_db(block *GbtAlphContext, user string, miner string, minerid string, fromGroup int, toGroup int, height int64, nonce string, hash string, subidx int64) bool {
db, err := sql.Open("sqlite3", "./blocks.db")
if err != nil {
log.Printf("Error opening database: %v", err)
return false
}
defer db.Close()
createTableSQL := `
CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user TEXT NOT NULL,
miner TEXT NOT NULL,
minerid TEXT NOT NULL,
fromGroup INTEGER NOT NULL,
toGroup INTEGER NOT NULL,
height INTEGER,
nonce TEXT NOT NULL,
hash TEXT NOT NULL,
subidx INTEGER,
checked INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`
_, err = db.Exec(createTableSQL)
if err != nil {
log.Printf("Error creating table: %v", err)
return false
}
insertSQL := `INSERT INTO blocks (user, miner, minerid, fromGroup, toGroup, height, nonce, hash, checked, subidx) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
_, err = db.Exec(insertSQL, user, miner, minerid, fromGroup, toGroup, height, nonce, hash, 0, subidx)
if err != nil {
log.Printf("Error inserting data from blocks %s: %v", fmt.Sprint(height), err)
return false
}
return true
}
func submitToChain(gbt *GbtAlphContext, block []byte) error {
blockSize := len(block)
msgPrefixSize := 1 + 1 + 4
msgSize := msgPrefixSize + blockSize
msgHeader := make([]byte, 10)
binary.BigEndian.PutUint32(msgHeader[0:4], uint32(msgSize))
msgHeader[4] = constants.MiningProtocolVersion
msgHeader[5] = constants.SubmitBlockMessageType
binary.BigEndian.PutUint32(msgHeader[6:10], uint32(blockSize))
data := bytes.Join([][]byte{msgHeader, block}, nil)
_, err := (*gbt.GbtCtx.ClientAlph).Write(data)
return err
}
func submit_block_running(block *GbtAlphContext) {
logg.Info("[block]", zap.String("submit_block_running", "Start."))
for {
if !block.GbtCtx.Started {
break
}
if block.GbtCtx.SubCh == nil {
block.GbtCtx.SubCh = utility.InitZmqSub(block.GbtCtx.Config.Zmq.Sub, "blk"+block.GbtCtx.Coin)
}
if block.GbtCtx.SubCh != nil {
cmsg_sub, err := block.GbtCtx.SubCh.RecvMessage() // [ []byte("topic"), []byte("message") ]
if err != nil {
if !block.GbtCtx.Started {
break
}
time.Sleep(time.Duration(1) * time.Second)
block.GbtCtx.SubCh.SetSubscribe("blk" + block.GbtCtx.Coin)
block.GbtCtx.SubCh.Connect(block.GbtCtx.Config.Zmq.Sub)
continue
}
if len(cmsg_sub) >= 2 {
if string(cmsg_sub[0]) == "blkalph" {
cmsg := cmsg_sub[1] // [(区块结构, xbyte), (heightBlob, 8byte), (indexB, 8byte)]
// [(0 - cmsg长度-16), (最后16 - 8 byte), (最后 7-0 byte)]
//block data
msgb := make([]byte, len(cmsg)-16)
copy(msgb, cmsg)
//height
heightb, err := hex.DecodeString(string(cmsg[len(msgb) : len(msgb)+8])) // heightb 8bytes
if err != nil {
logg.Error("[block]", zap.String("failed to decode height", err.Error()))
continue
}
var height uint32 = utility.ByteToUint32(heightb)
logg.Warn("[block]", zap.Uint32("height", height))
if height <= block.last_height {
continue
}
block.last_height = height
//index
indexb, err1 := hex.DecodeString(string(cmsg[len(msgb)+8:])) // indexb 8bytes
if err1 != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg.Error("[block]", zap.String("failed to decode index", err1.Error()))
continue
}
//copy(indexb, cmsg.Value[len(msgb)+4:])
var index uint32 = utility.ByteToUint32(indexb)
logg.Warn("[block]", zap.Uint32("index", index))
logg.Debug("[block]", zap.String("msg", string(cmsg)), zap.String("blk", string(msgb)))
var alphblock msg.BlockAlphMsg
if err := json.Unmarshal(msgb, &alphblock); err != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg.Error("[block]", zap.String("failed to Unmarshal job", err.Error()))
continue
}
var nonceBytes, headerBytes, txsBytes []byte
if alphblock.Txs != "" {
nonceBytes, headerBytes, txsBytes = utility.HexStrToBytes(alphblock.Nonce), utility.HexStrToBytes(alphblock.Header), utility.HexStrToBytes(alphblock.Txs)
} else {
nonceBytes, headerBytes, txsBytes = utility.HexStrToBytes(alphblock.Nonce), utility.HexStrToBytes(alphblock.Header), []byte{0x00}
}
blk := bytes.Join([][]byte{nonceBytes, headerBytes, txsBytes}, nil) // blk:= nonceBytes + headerBytes + txsBytes
// rawmsgs := make([]json.RawMessage, 1)
logg.Info("[block]", zap.String("blk", fmt.Sprintf("%x", blk)))
err2 := submitToChain(block, blk) // 将报块提交到链上
if err2 != nil {
logg.Error("[block]", zap.String("提交报块失败:", err2.Error()))
}
// result, err := block.GbtCtx.Client.RawRequest("submitminingsolution", rawmsgs)
// if err != nil {
// logg.Error("[block]", zap.String("submitminingsolution", err.Error()))
// } else {
// //last_result = result
// }
dbif.NotifyAlphPoolBlkStatsSubmitResult(block.GbtCtx, alphblock.FromGroup, alphblock.ToGroup, int64(height), alphblock.Hash, alphblock.Nonce, alphblock.SubIdx)
block.Submits += 1
logg.Warn("[block]", zap.Float64("total submits", block.Submits), zap.Int64("SubIdx", alphblock.SubIdx))
new_block_into_db(block, alphblock.User, alphblock.Miner, alphblock.Index, int(alphblock.FromGroup), int(alphblock.ToGroup), int64(height), alphblock.Nonce, alphblock.Hash, alphblock.SubIdx)
}
}
} else {
logg.Error("[block]", zap.String("block", "SubCh failed! retry"))
time.Sleep(time.Duration(1) * time.Second)
}
}
}