2025-04-10 07:27:24 +00:00
package alph
import (
"bytes"
"database/sql"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net"
"strconv"
"sync"
"sync/atomic"
//"pool/internal/cache"
"pool/internal/db"
"pool/internal/gbt/alph/constants"
"pool/internal/gbt/coin"
"pool/internal/gbt/dbif"
"pool/internal/msg"
"pool/internal/utility"
"time"
//"github.com/btcsuite/btcd/rpcclient"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
)
const GBT_ALPH_VERSION = "alph v1.0"
const headerSize = 4
type AlphAddrConfig struct {
Addr string ` json:"addr" `
}
type AlphConfig struct {
Alph AlphAddrConfig ` json:"alph" `
}
type GbtAlphContext struct {
Config AlphConfig
GbtCtx * coin . GbtContext
last_time time . Time
last_gbt BlockAlphMsg
last_blockhash string
last_height uint32
Submits float64
addressIndex int
Target [ ] byte
Header [ ] byte
last_body string
ChainIndex int // 分片
new_block_chan chan int
new_block_index int
}
type BlockAlphMsg struct {
Jobs [ ] AlphBlockMsg
}
type AlphBlockMsg struct {
JobId string
FromGroup uint32
ToGroup uint32
Height uint32
HeaderBlob string
TargetBlob string
TxsBlob string
}
type ReceiveJob struct {
FromGroup uint32
ToGroup uint32
HeaderBlobLength uint32
HeaderBlob [ ] byte
TxsBlobLength uint32
TxsBlob [ ] byte
TargetLength uint32
TargetBlob [ ] byte
Height uint32
DataLength uint32
}
type JobCounter struct {
counter uint64
mu sync . Mutex
}
var logg * zap . Logger
var GbtAlphCtx GbtAlphContext
type GbtAlphMsg struct {
Id uint64 ` json:"id" `
HeaderCommitment string ` json:"headerCommitment" ` // 获取到的任务的区块头
NBits string ` json:"nBits" `
}
type GetBlockHeaderMsg struct {
Height int ` json:"height" `
Nonce string ` json:"nonce" `
Confirmations int ` json:"confirmations" `
}
type GetBlockStatsMsg struct {
Height int ` json:"height" `
Subsidy float64 ` json:"subsidy" `
Totalfee float64 ` json:"totalfee" `
}
type BlockCheckData struct {
FromGroup uint32
ToGroup uint32
Height int
Nonce string
User string
Miner string
MinerId string
Hash string
SubIdx int
}
type PushBlkNewMsg struct {
Coin string ` json:"coin" `
Height int ` json:"height" `
Nonce string ` json:"nonce" `
}
var addr = [ ] string {
"1CYKPymfTVex9KZ2i48S3v5cAE7xT6hERG1P6GJiHgWJu" ,
"14eEDF5SvnYcz12Cmkn9UJHLiiTKcqvaP2JgbsSevc38H" ,
"1HLvickKHvsFziGqmZt7hcithAtRkgueUBHXRHBvMSp98" ,
"1JBHWv4XPbagWxnC9HXimz67MY85KoKz3h16uvk3cJcAS" ,
}
func stringToFloat64 ( str string ) float64 {
floatValue , err := strconv . ParseFloat ( str , 64 )
if err != nil {
fmt . Println ( "转换失败:" , err )
return 0
}
return floatValue
}
func savePoint4 ( num float64 ) float64 {
result := fmt . Sprintf ( "%.4f" , num )
return stringToFloat64 ( result )
}
func update_block_confirm ( gbt * GbtAlphContext ) {
// 1,每次报块都调用本函数
// 2,先查出报块时的上两个块,然后查询这些快是否在主链上(通过区块的hash传入CheckBlk函数)
// 3,通过查询区块信息, 确定区块的nonce、height、奖励, 通过链信息, 查询出当前链高度
db , err := sql . Open ( "sqlite3" , "./blocks.db" )
if err != nil {
//log.Printf("Error opening database: %v", err)
logg . Error ( "[gbt]" , zap . String ( "Error opening database" , err . Error ( ) ) )
return
}
defer db . Close ( )
query := "SELECT fromgroup, togroup, user, miner, minerid, height, nonce, hash, subidx FROM blocks WHERE checked=0 AND created_at >= datetime('now', '-30 minutes') order by id desc limit 1"
rows , err := db . Query ( query )
if err != nil {
//log.Printf("Error executing query from blocks: %v", err)
logg . Error ( "[gbt]" , zap . String ( "Error executing query from blocks:" , err . Error ( ) ) )
return
}
defer rows . Close ( )
var blocks [ ] BlockCheckData
for rows . Next ( ) {
var fromGroup uint32
var toGroup uint32
var height int
var user string
var miner string
var minerid string
var hash string
var subidx int
var nonce string
if err := rows . Scan ( & fromGroup , & toGroup , & user , & miner , & minerid , & height , & nonce , & hash , & subidx ) ; err != nil {
//log.Printf("Error scanning row in blocks: %v", err)
logg . Error ( "[gbt]" , zap . String ( "Error scanning row in blocks:" , err . Error ( ) ) )
return
}
var blockdata BlockCheckData
blockdata . FromGroup = fromGroup
blockdata . ToGroup = toGroup
blockdata . Height = height
blockdata . User = user
blockdata . Miner = miner
blockdata . MinerId = minerid
blockdata . Hash = hash
blockdata . SubIdx = subidx
blockdata . Nonce = nonce
blocks = append ( blocks , blockdata )
}
fmt . Println ( "blocks:" , blocks )
for _ , block := range blocks {
blockHash , err := gbt . GbtCtx . ClientAlphApi . GetBlockHash ( block . FromGroup , block . ToGroup , uint32 ( block . Height ) )
if err != nil {
logg . Info ( "[gbt]" , zap . String ( "GetBlockHash " , err . Error ( ) ) )
continue
}
if blockHash == block . Hash {
checkResult := gbt . GbtCtx . ClientAlphApi . CheckBlk ( blockHash )
if checkResult {
blockInfo := gbt . GbtCtx . ClientAlphApi . GetBlcokInfo ( blockHash )
var total_amount float64 = 0
2025-04-16 08:30:06 +00:00
fromGroup , toGroup , nonce , height := blockInfo . ChainFrom , blockInfo . ChainTo , blockInfo . Nonce , blockInfo . Height
for _ , tx := range blockInfo . Transactions {
for _ , out := range tx . Unsigned . FixedOutputs {
if out . Address == addr [ fromGroup ] && nonce == blockInfo . Nonce {
amount := out . AttoAlphAmount
2025-04-10 07:27:24 +00:00
total_amount += savePoint4 ( stringToFloat64 ( amount ) / math . Pow ( 10 , 18 ) )
2025-04-16 08:30:06 +00:00
fmt . Println ( fromGroup , "->" , toGroup , "(" , height , "): " , total_amount , "," , nonce , " " , blockHash , " " , out . Address , " 报块成功" )
2025-04-10 07:27:24 +00:00
} else {
2025-04-16 08:30:06 +00:00
fmt . Println ( fromGroup , "->" , toGroup , "(" , height , "): " , total_amount , "," , nonce , "不是报块,实际报块地址为:" , out . Address )
2025-04-10 07:27:24 +00:00
}
}
}
2025-04-16 08:30:06 +00:00
// if len(input) == 0 {
// for v := range output {
// address, amount := output[v].Address, output[v].AttoAlphAmount
// if address == addr[fromGroup] && nonce == block.Nonce {
// total_amount += savePoint4(stringToFloat64(amount) / math.Pow(10, 18))
// fmt.Println(fromGroup, "->", toGroup, "(", height, "): ", total_amount, ",", nonce, " ", blockHash, " ", address, " 报块成功")
// } else {
// fmt.Println(fromGroup, "->", toGroup, "(", height, "): ", total_amount, ",", nonce, "不是报块,实际报块地址为:", address)
// }
// }
// }
2025-04-10 07:27:24 +00:00
block_height := int64 ( block . Height )
dbif . AlphNotifyPoolBlkStatsSuccess ( gbt . GbtCtx , uint32 ( fromGroup ) , uint32 ( toGroup ) , block_height , "" , block . Nonce , int64 ( block . SubIdx ) , total_amount , 0 )
dbif . NotifyAlphBlkDetailSuccess ( gbt . GbtCtx , uint32 ( fromGroup ) , uint32 ( toGroup ) , block_height , "" , block . Nonce , int64 ( block . SubIdx ) )
dbif . NotifyAlphBlkNewDb ( gbt . GbtCtx , uint32 ( fromGroup ) , uint32 ( toGroup ) , block_height , block . Hash , true , block . Nonce , int64 ( block . SubIdx ) )
updateSQL := ` UPDATE blocks SET checked = 1 WHERE height = ? AND nonce = ? AND checked = 0 `
_ , err = db . Exec ( updateSQL , block . Height , block . Nonce )
if err != nil {
logg . Error ( "[gbt]" , zap . String ( "Error updating blk_new:" , err . Error ( ) ) )
continue
}
logg . Warn ( "[gbt]" , zap . String ( "update block success:" , fmt . Sprint ( block . Height ) + " " + block . Nonce ) )
}
} else {
logg . Info ( "[gbt]" , zap . String ( "GetBlockHash " , "区块hash不在主链上" ) )
}
}
}
func NewAlphPool ( host string , port string ) ( * net . Conn , error ) {
conn , err := net . Dial ( "tcp" , fmt . Sprintf ( "%s:%s" , host , port ) )
if err != nil {
return nil , fmt . Errorf ( "failed to connect to mining pool: %w" , err )
}
return & conn , nil
}
func processJob ( data [ ] byte ) ( * ReceiveJob , error ) {
reader := bytes . NewReader ( data )
var job ReceiveJob
// Read fixed fields
fields := [ ] interface { } {
& job . FromGroup , & job . ToGroup , & job . HeaderBlobLength ,
}
for _ , field := range fields {
if err := binary . Read ( reader , binary . BigEndian , field ) ; err != nil {
return nil , fmt . Errorf ( "failed to read fixed field: %w" , err )
}
}
// Read variable-length fields
job . HeaderBlob = make ( [ ] byte , job . HeaderBlobLength )
if _ , err := io . ReadFull ( reader , job . HeaderBlob ) ; err != nil {
return nil , fmt . Errorf ( "failed to read HeaderBlob: %w" , err )
}
fields = [ ] interface { } {
& job . TxsBlobLength ,
}
for _ , field := range fields {
if err := binary . Read ( reader , binary . BigEndian , field ) ; err != nil {
return nil , fmt . Errorf ( "failed to read fixed field: %w" , err )
}
}
job . TxsBlob = make ( [ ] byte , job . TxsBlobLength )
if _ , err := io . ReadFull ( reader , job . TxsBlob ) ; err != nil {
return nil , fmt . Errorf ( "failed to read TxsBlob: %w" , err )
}
fields = [ ] interface { } {
& job . TargetLength ,
}
for _ , field := range fields {
if err := binary . Read ( reader , binary . BigEndian , field ) ; err != nil {
return nil , fmt . Errorf ( "failed to read fixed field: %w" , err )
}
}
job . TargetBlob = make ( [ ] byte , job . TargetLength )
if _ , err := io . ReadFull ( reader , job . TargetBlob ) ; err != nil {
return nil , fmt . Errorf ( "failed to read TargetBlob: %w" , err )
}
if err := binary . Read ( reader , binary . BigEndian , & job . Height ) ; err != nil {
return nil , fmt . Errorf ( "failed to read Height: %w" , err )
}
job . DataLength = uint32 ( len ( data ) - reader . Len ( ) )
return & job , nil
}
func _parseJobs ( message [ ] byte ) ( [ ] ReceiveJob , error ) {
jobCount := binary . BigEndian . Uint32 ( message [ : 4 ] )
var jobs [ ] ReceiveJob
offset := 4
for i := uint32 ( 0 ) ; i < jobCount ; i ++ {
if offset >= len ( message ) {
return nil , fmt . Errorf ( "insufficient data for job parsing" )
}
job , err := processJob ( message [ offset : ] )
if err != nil {
return nil , fmt . Errorf ( "error processing job: %w" , err )
}
jobs = append ( jobs , * job )
offset += int ( job . DataLength )
}
return jobs , nil
}
type SubmitResult struct {
FromGroup uint32
ToGroup uint32
BlockHash [ ] byte
Successed bool
}
func _parseSubmitResult ( buffer [ ] byte ) ( SubmitResult , error ) {
if len ( buffer ) < 41 {
return SubmitResult { } , fmt . Errorf ( "buffer length is too short: %d" , len ( buffer ) )
}
fromGroup := binary . BigEndian . Uint32 ( buffer [ 0 : 4 ] )
toGroup := binary . BigEndian . Uint32 ( buffer [ 4 : 8 ] )
blockHash := buffer [ 8 : 40 ]
successed := buffer [ 40 ] == 1
new_block_notify := successed
if new_block_notify {
update_block_confirm ( & GbtAlphCtx )
}
// fmt.Println("节点返回的hash: ", blockHash)
// fmt.Println("节点返回的结果:", successed)
return SubmitResult {
FromGroup : fromGroup ,
ToGroup : toGroup ,
BlockHash : blockHash ,
Successed : successed ,
} , nil
}
func ParseJobs ( data [ ] byte ) ( interface { } , error ) {
if len ( data ) < headerSize + 1 {
return nil , fmt . Errorf ( "data too short to contain a valid job" )
}
reader := bytes . NewReader ( data )
var bodyLength uint32
if err := binary . Read ( reader , binary . BigEndian , & bodyLength ) ; err != nil {
return nil , fmt . Errorf ( "failed to read body length: %w" , err )
}
version := uint8 ( data [ headerSize ] )
if version != constants . MiningProtocolVersion {
return nil , fmt . Errorf ( "通信协议版本号错误!" )
}
messageType := uint8 ( data [ headerSize + 1 ] )
// statOffset := headerSize + 2
// endOffset := headerSize + bodyLength
message := data [ headerSize + 2 : headerSize + bodyLength ] // startOffset : endOffset
if len ( message ) < 4 {
return nil , fmt . Errorf ( "message too short to parse job size" )
}
var result interface { }
var err error
if messageType == constants . JobsMessageType {
result , err = _parseJobs ( message )
if err != nil {
return nil , fmt . Errorf ( "解析任务失败: %w" , err )
}
return result . ( [ ] ReceiveJob ) , nil
} else if messageType == constants . SubmitResultMessageType {
result , err = _parseSubmitResult ( message )
if err != nil {
return nil , fmt . Errorf ( "解析提交结果失败: %w" , err )
}
return result . ( SubmitResult ) , nil
} else {
return nil , fmt . Errorf ( "未知消息类型: %d" , messageType )
}
}
func NewJobCounter ( ) * JobCounter {
return & JobCounter { }
}
// JobCounter 增加计数器并返回当前计数器值的十六进制表示
func ( jc * JobCounter ) Next ( ) string {
jc . mu . Lock ( )
defer jc . mu . Unlock ( )
// 增加计数器
jc . counter ++
// 当计数器达到 65535 时重置为 1
if jc . counter >= 0xFFFF {
jc . counter = 1
}
return jc . Current ( )
}
// 返回当前计数器值的十六进制表示
func ( jc * JobCounter ) Current ( ) string {
return fmt . Sprintf ( "%x" , jc . counter )
}
var jobCounter = NewJobCounter ( ) // 创建一个全局唯一的JobCounter实例
func gbt_running ( gbt * GbtAlphContext ) {
fmt . Println ( "gbt_running started" )
var lastBuffer [ ] byte
var buffer = make ( [ ] byte , 5800 )
for {
if ! gbt . GbtCtx . Started {
break
}
n , err := ( * gbt . GbtCtx . ClientAlph ) . Read ( buffer )
if err != nil {
logg . Error ( "Error reading message" , zap . Error ( err ) )
break
}
message := append ( lastBuffer , buffer [ : n ] ... )
for len ( message ) >= headerSize {
if len ( message ) < headerSize {
break
}
bodyLength := binary . BigEndian . Uint32 ( message [ : headerSize ] )
totalLength := headerSize + int ( bodyLength )
if len ( message ) < totalLength {
break
}
completeMessage := message [ : totalLength ]
processMessage ( completeMessage , gbt )
message = message [ totalLength : ]
}
lastBuffer = message
}
}
func get_gbt_msg ( gbt * GbtAlphContext , gbtmsg [ ] byte ) {
if ! gbt . GbtCtx . Started {
return
}
// new_block_notify := false
if gbtmsg != nil {
//check_preblock(gbt, DbCtx)
if gbt . GbtCtx . PubCh == nil {
gbt . GbtCtx . PubCh = utility . InitZmqPub ( gbt . GbtCtx . Config . Zmq . Pub )
}
if gbt . GbtCtx . PubCh != nil {
for trycnt := 0 ; trycnt < 3 ; trycnt ++ {
err := gbt . GbtCtx . PubCh . SendMessage ( [ ] [ ] byte { [ ] byte ( "jobalph" ) , gbtmsg } )
if err != nil {
logg . Warn ( "[gbt]" , zap . String ( "job " , err . Error ( ) ) )
continue
} else {
//gbt.GbtCtx.PubCh.SendChan <- [][]byte{[]byte("jobnexa"), gbtmsg}
logg . Warn ( "[gbt]" , zap . String ( "job " , "sent" ) )
break
}
}
//gbt.GbtCtx.AlivingChan <- true
atomic . StoreInt32 ( & ( gbt . GbtCtx . FlagAliving ) , 1 )
} else {
logg . Warn ( "[gbt]" , zap . String ( "job " , "sent failed! PubCh nil" ) )
}
} else {
atomic . StoreInt32 ( & ( gbt . GbtCtx . FlagAliving ) , 1 )
}
// if new_block_notify {
// update_block_confirm(gbt)
// }
// }
}
// 重置定时器
// func resetTimer(timer *time.Timer, timeout int) {
// if !timer.Stop() {
// <-timer.C
// }
// timer.Reset(time.Duration(timeout) * time.Millisecond)
// }
// 处理接收到的完整消息
func processMessage ( message [ ] byte , gbt * GbtAlphContext ) {
jobs , err := ParseJobs ( message )
if err != nil {
logg . Error ( "Error parsing jobs" , zap . Error ( err ) )
return
}
switch job := jobs . ( type ) {
case [ ] ReceiveJob :
handleReceiveJobs ( job , gbt )
case SubmitResult :
fmt . Println ( "job type: " , job . Successed )
handleSubmitResult ( job )
default :
logg . Warn ( "Unexpected message type" )
}
}
// var jobIndex int = 0
// 处理接收到的任务
func handleReceiveJobs ( jobs [ ] ReceiveJob , gbt * GbtAlphContext ) {
// for _, job := range jobs {
jobId := jobCounter . Next ( )
// block := AlphBlockMsg{
// JobId: jobId,
// FromGroup: job.FromGroup,
// ToGroup: job.ToGroup,
// Height: job.Height,
// Header: fmt.Sprintf("%x", job.HeaderBlob),
// Target: fmt.Sprintf("%x", job.TargetBlob),
// Txs: fmt.Sprintf("%x", job.TxsBlob),
// }
// alphJobs.Jobs = append(alphJobs.Jobs, block)
// }
// index := rand.Intn(16)
// 0-0 0-1 0-2 0-3 | 0 1 2 3
// 1-0 1-1 1-2 1-3 | 4 5 6 7
// 2-0 2-1 2-2 2-3 | 8 9 10 11
// 3-0 3-1 3-2 3-3 | 12 13 14 15
index := 0
jobId , FromGroup , ToGroup , Height , Header , Target , Txs := jobId , jobs [ index ] . FromGroup , jobs [ index ] . ToGroup , jobs [ index ] . Height , jobs [ index ] . HeaderBlob , jobs [ index ] . TargetBlob , jobs [ index ] . TxsBlob
job := AlphBlockMsg {
JobId : jobId ,
FromGroup : FromGroup ,
ToGroup : ToGroup ,
Height : Height ,
HeaderBlob : fmt . Sprintf ( "%x" , Header ) ,
TargetBlob : fmt . Sprintf ( "%x" , Target ) ,
TxsBlob : fmt . Sprintf ( "%x" , Txs ) ,
}
// fmt.Println("接到新任务:", job.JobId, job.TargetBlob)
data , err := json . Marshal ( job )
if err != nil {
logg . Error ( "Error marshaling job" , zap . Error ( err ) )
return
}
// jobIndex += 1
// fmt.Println("gbt获取任务时间: ", time.Now(), "\t", "gbt获取任务序号: ", jobIndex)
get_gbt_msg ( gbt , data )
// }
}
// 处理提交区块的结果
func handleSubmitResult ( result SubmitResult ) {
fromGroup , toGroup := result . FromGroup , result . ToGroup
hash := utility . BytesToHexStr ( result . BlockHash )
chainIndex := utility . ChainIndexStr ( fromGroup , toGroup )
// fmt.Println("报块上链:", chainIndex, ": ", hash)
if result . Successed {
logg . Info ( "Block submitted successfully" , zap . String ( "hash" , hash ) , zap . String ( "chainIndex" , chainIndex ) )
} else {
logg . Error ( "Block submission failed" , zap . String ( "hash" , hash ) , zap . String ( "chainIndex" , chainIndex ) )
}
}
func gbt_notify_running ( gbt * GbtAlphContext ) {
for {
if ! gbt . GbtCtx . Started {
break
}
if gbt . GbtCtx . NodeSubCh == nil {
gbt . GbtCtx . NodeSubCh = utility . InitZmqSub ( gbt . GbtCtx . Config . Rpc . ZmqSub , utility . BITCOIND_ZMQ_HASHBLOCK )
}
if gbt . GbtCtx . NodeSubCh != nil {
cmsg_sub , err := gbt . GbtCtx . NodeSubCh . RecvMessage ( )
if err != nil {
if ! gbt . GbtCtx . Started {
break
}
gbt . GbtCtx . NodeSubCh . SetSubscribe ( utility . BITCOIND_ZMQ_HASHBLOCK )
gbt . GbtCtx . NodeSubCh . Connect ( gbt . GbtCtx . Config . Rpc . ZmqSub )
continue
}
if len ( cmsg_sub ) >= 2 {
if string ( cmsg_sub [ 0 ] ) == "hashblock" {
GbtAlphCtx . new_block_index = GbtAlphCtx . new_block_index + 1
gbt . new_block_chan <- GbtAlphCtx . new_block_index
}
}
} else {
logg . Error ( "[gbt]" , zap . String ( "notify" , "NodeSubCh fail!" ) )
time . Sleep ( time . Duration ( 1 ) * time . Second )
}
}
}
func alphInit ( config * AlphConfig ) {
data , err := ioutil . ReadFile ( "gbt.conf" )
if err != nil {
panic ( err . Error ( ) )
}
if err = json . Unmarshal ( data , & config ) ; err != nil {
panic ( err . Error ( ) )
}
}
func Init ( GbtCtx * coin . GbtContext , DbCtx * db . DbContext ) {
GbtAlphCtx . GbtCtx = GbtCtx
GbtAlphCtx . last_height = 0
alphInit ( & GbtAlphCtx . Config )
GbtAlphCtx . Target = make ( [ ] byte , 32 )
GbtAlphCtx . Header = make ( [ ] byte , 49 )
GbtAlphCtx . last_time = time . Now ( )
logg = GbtCtx . Log
GbtAlphCtx . new_block_chan = make ( chan int , 256 )
GbtAlphCtx . new_block_index = 0
}
func Start ( ) {
go gbt_running ( & GbtAlphCtx )
// go gbt_notify_running(&GbtAlphCtx)
go submit_block_running ( & GbtAlphCtx )
}
func Stop ( ) {
defer close ( GbtAlphCtx . new_block_chan )
}
func new_block_into_db ( block * GbtAlphContext , user string , miner string , minerid string , fromGroup int , toGroup int , height int64 , nonce string , hash string , subidx int64 ) bool {
db , err := sql . Open ( "sqlite3" , "./blocks.db" )
if err != nil {
log . Printf ( "Error opening database: %v" , err )
return false
}
defer db . Close ( )
createTableSQL := `
CREATE TABLE IF NOT EXISTS blocks (
id INTEGER PRIMARY KEY AUTOINCREMENT ,
user TEXT NOT NULL ,
miner TEXT NOT NULL ,
minerid TEXT NOT NULL ,
fromGroup INTEGER NOT NULL ,
toGroup INTEGER NOT NULL ,
height INTEGER ,
nonce TEXT NOT NULL ,
hash TEXT NOT NULL ,
subidx INTEGER ,
checked INTEGER ,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ; `
_ , err = db . Exec ( createTableSQL )
if err != nil {
log . Printf ( "Error creating table: %v" , err )
return false
}
insertSQL := ` INSERT INTO blocks (user, miner, minerid, fromGroup, toGroup, height, nonce, hash, checked, subidx) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `
_ , err = db . Exec ( insertSQL , user , miner , minerid , fromGroup , toGroup , height , nonce , hash , 0 , subidx )
if err != nil {
log . Printf ( "Error inserting data from blocks %s: %v" , fmt . Sprint ( height ) , err )
return false
}
return true
}
func submitToChain ( gbt * GbtAlphContext , block [ ] byte ) error {
blockSize := len ( block )
msgPrefixSize := 1 + 1 + 4
msgSize := msgPrefixSize + blockSize
msgHeader := make ( [ ] byte , 10 )
binary . BigEndian . PutUint32 ( msgHeader [ 0 : 4 ] , uint32 ( msgSize ) )
msgHeader [ 4 ] = constants . MiningProtocolVersion
msgHeader [ 5 ] = constants . SubmitBlockMessageType
binary . BigEndian . PutUint32 ( msgHeader [ 6 : 10 ] , uint32 ( blockSize ) )
data := bytes . Join ( [ ] [ ] byte { msgHeader , block } , nil )
_ , err := ( * gbt . GbtCtx . ClientAlph ) . Write ( data )
return err
}
func submit_block_running ( block * GbtAlphContext ) {
logg . Info ( "[block]" , zap . String ( "submit_block_running" , "Start." ) )
for {
if ! block . GbtCtx . Started {
break
}
if block . GbtCtx . SubCh == nil {
block . GbtCtx . SubCh = utility . InitZmqSub ( block . GbtCtx . Config . Zmq . Sub , "blk" + block . GbtCtx . Coin )
}
if block . GbtCtx . SubCh != nil {
cmsg_sub , err := block . GbtCtx . SubCh . RecvMessage ( ) // [ []byte("topic"), []byte("message") ]
if err != nil {
if ! block . GbtCtx . Started {
break
}
time . Sleep ( time . Duration ( 1 ) * time . Second )
block . GbtCtx . SubCh . SetSubscribe ( "blk" + block . GbtCtx . Coin )
block . GbtCtx . SubCh . Connect ( block . GbtCtx . Config . Zmq . Sub )
continue
}
if len ( cmsg_sub ) >= 2 {
if string ( cmsg_sub [ 0 ] ) == "blkalph" {
cmsg := cmsg_sub [ 1 ] // [(区块结构, xbyte), (heightBlob, 8byte), (indexB, 8byte)]
// [(0 - cmsg长度-16), (最后16 - 8 byte), (最后 7-0 byte)]
//block data
msgb := make ( [ ] byte , len ( cmsg ) - 16 )
copy ( msgb , cmsg )
//height
heightb , err := hex . DecodeString ( string ( cmsg [ len ( msgb ) : len ( msgb ) + 8 ] ) ) // heightb 8bytes
if err != nil {
logg . Error ( "[block]" , zap . String ( "failed to decode height" , err . Error ( ) ) )
continue
}
var height uint32 = utility . ByteToUint32 ( heightb )
logg . Warn ( "[block]" , zap . Uint32 ( "height" , height ) )
if height <= block . last_height {
continue
}
block . last_height = height
//index
indexb , err1 := hex . DecodeString ( string ( cmsg [ len ( msgb ) + 8 : ] ) ) // indexb 8bytes
if err1 != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg . Error ( "[block]" , zap . String ( "failed to decode index" , err1 . Error ( ) ) )
continue
}
//copy(indexb, cmsg.Value[len(msgb)+4:])
var index uint32 = utility . ByteToUint32 ( indexb )
logg . Warn ( "[block]" , zap . Uint32 ( "index" , index ) )
logg . Debug ( "[block]" , zap . String ( "msg" , string ( cmsg ) ) , zap . String ( "blk" , string ( msgb ) ) )
var alphblock msg . BlockAlphMsg
if err := json . Unmarshal ( msgb , & alphblock ) ; err != nil {
//block.Consumer.MarkOffset(cmsg, "")
logg . Error ( "[block]" , zap . String ( "failed to Unmarshal job" , err . Error ( ) ) )
continue
}
var nonceBytes , headerBytes , txsBytes [ ] byte
if alphblock . Txs != "" {
nonceBytes , headerBytes , txsBytes = utility . HexStrToBytes ( alphblock . Nonce ) , utility . HexStrToBytes ( alphblock . Header ) , utility . HexStrToBytes ( alphblock . Txs )
} else {
nonceBytes , headerBytes , txsBytes = utility . HexStrToBytes ( alphblock . Nonce ) , utility . HexStrToBytes ( alphblock . Header ) , [ ] byte { 0x00 }
}
blk := bytes . Join ( [ ] [ ] byte { nonceBytes , headerBytes , txsBytes } , nil ) // blk:= nonceBytes + headerBytes + txsBytes
// rawmsgs := make([]json.RawMessage, 1)
logg . Info ( "[block]" , zap . String ( "blk" , fmt . Sprintf ( "%x" , blk ) ) )
err2 := submitToChain ( block , blk ) // 将报块提交到链上
if err2 != nil {
logg . Error ( "[block]" , zap . String ( "提交报块失败:" , err2 . Error ( ) ) )
}
// result, err := block.GbtCtx.Client.RawRequest("submitminingsolution", rawmsgs)
// if err != nil {
// logg.Error("[block]", zap.String("submitminingsolution", err.Error()))
// } else {
// //last_result = result
// }
dbif . NotifyAlphPoolBlkStatsSubmitResult ( block . GbtCtx , alphblock . FromGroup , alphblock . ToGroup , int64 ( height ) , alphblock . Hash , alphblock . Nonce , alphblock . SubIdx )
block . Submits += 1
logg . Warn ( "[block]" , zap . Float64 ( "total submits" , block . Submits ) , zap . Int64 ( "SubIdx" , alphblock . SubIdx ) )
new_block_into_db ( block , alphblock . User , alphblock . Miner , alphblock . Index , int ( alphblock . FromGroup ) , int ( alphblock . ToGroup ) , int64 ( height ) , alphblock . Nonce , alphblock . Hash , alphblock . SubIdx )
}
}
} else {
logg . Error ( "[block]" , zap . String ( "block" , "SubCh failed! retry" ) )
time . Sleep ( time . Duration ( 1 ) * time . Second )
}
}
}