m2pool_core/internal/server/server.go

1613 lines
52 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// server.go
package server
/*
#cgo LDFLAGS: -lzmq
#include <zmq.h>
void set_max_msg_size(void *ctx, int size) {
zmq_ctx_set(ctx, ZMQ_MAXMSGSIZE, size);
}
*/
import "C"
import (
"bufio"
"container/list"
"context"
//"encoding/binary"
//"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
//"math/big"
"net"
"os"
"os/signal"
"pool/internal/db"
"pool/internal/server/coin"
"pool/internal/server/dbif"
"math/rand"
"pool/internal/cache"
"pool/internal/server/alph"
// "pool/internal/server/enx"
"pool/internal/server/nexa"
"pool/internal/stratum"
"pool/internal/utility"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/rs/zerolog"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"gopkg.in/natefinch/lumberjack.v2"
)
var logg *zap.Logger
var logr *lumberjack.Logger
var ServerCtx coin.ServerContext
func InitConfig() *coin.ServerConfig {
var config coin.ServerConfig
data, err := ioutil.ReadFile("server.conf")
if err != nil {
panic(err.Error())
}
if err = json.Unmarshal(data, &config); err != nil {
panic(err.Error())
}
return &config
}
/*func ServerLivingHandler(server *coin.ServerContext) {
timer := time.NewTimer(time.Duration(600) * time.Second)
defer timer.Stop()
for {
select {
case living := <-server.AlivingChan:
if !living {
timer.Stop()
//log.Println("ServerLivingHandler exited, living: false")
server.Logg.Error("[server]", zap.String("ServerLivingHandler exited", "living: false"))
return
} else {
if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Duration(600) * time.Second)
}
server.Logg.Error("[server]", zap.String("ServerLivingHandler exited", "living: true"))
case <-timer.C:
//log.Println("ServerLivingHandler exited, timer expired")
server.Logg.Error("[server]", zap.String("ServerLivingHandler exited", "timer expired"))
server.LiveingExpired = true
server.DbCtx.AppExit <- true
return
}
}
}*/
func ServerLivingHandler(server *coin.ServerContext) {
var to_cnt int = 0
for {
flagAliving := atomic.LoadInt32(&(server.FlagAliving))
flagExit := atomic.LoadInt32(&(server.FlagAlivingExit))
if flagExit == 1 {
server.Logg.Error("[server]", zap.String("ServerLivingHandler exited", "exit"))
break
}
if flagAliving == 0 {
if to_cnt > server.CoinCtx.GetBlockInterval() {
server.Logg.Error("[server]", zap.String("ServerLivingHandler exited", "timer expired"))
server.DbCtx.AppExit <- true
break
}
to_cnt++
} else {
to_cnt = 0
atomic.StoreInt32(&(server.FlagAliving), 0)
}
time.Sleep(time.Second)
}
}
var coinobjs = []coin.CoinObj{
{
Coin: "nexa",
Init: nexa.Init,
Start: nexa.Start,
Stop: nexa.Stop,
InitMiner: nexa.InitMiner,
HandleMinerSubscribe: nexa.HandleMinerSubscribe,
HandleMinerAuth: nexa.HandleMinerAuth,
HandleMinerSubmit: nexa.HandleMinerSubmit,
SetDifficulty: nexa.SetDifficulty,
Notify: nexa.Notify,
HandleJobMsg: nexa.HandleJobMsg,
IsMhsLow: nexa.IsMhsLow,
GetBlockInterval: nexa.GetBlockInterval,
},
{
Coin: "alph",
Init: alph.Init,
Start: alph.Start,
Stop: alph.Stop,
InitMiner: alph.InitMiner,
HandleMinerSubscribe: alph.HandleMinerSubscribe,
HandleMinerAuth: alph.HandleMinerAuth,
HandleMinerSubmit: alph.HandleMinerSubmit,
SetDifficulty: alph.SetDifficulty,
Notify: alph.Notify,
HandleJobMsg: alph.HandleJobMsg,
IsMhsLow: alph.IsMhsLow,
GetBlockInterval: alph.GetBlockInterval,
},
}
func register_signal(dbctx *db.DbContext) {
signal_ch := make(chan os.Signal, 1)
signal.Notify(signal_ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go signal_handle(signal_ch, dbctx)
}
func signal_handle(signal_ch chan os.Signal, DbCtx *db.DbContext) {
for s := range signal_ch {
switch s {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
DbCtx.AppExit <- true
//log.Println("stop signal!")
if logg != nil {
logg.Error("[server]", zap.String("signal", "stop"))
}
default:
//fmt.Println("other signal", s)
}
}
}
func register_user_signal(server *coin.ServerContext) {
signal_ch := make(chan os.Signal, 1)
signal.Notify(signal_ch, syscall.SIGUSR1)
go user_signal_handle(signal_ch, server)
}
func user_signal_handle(signal_ch chan os.Signal, ctx *coin.ServerContext) {
for s := range signal_ch {
switch s {
case syscall.SIGUSR1:
//log.Println("user signal 1!")
if logg != nil {
logg.Error("[server]", zap.String("signal", "user 1"))
}
default:
//fmt.Println("other signal", s)
}
}
}
func do_keepalive_ping(server *coin.ServerContext) {
timer := time.NewTimer(time.Duration(5) * time.Second)
defer timer.Stop()
for {
if server.ExitFlag {
break
}
select {
//case <-time.After(time.Duration(5) * time.Second):
case <-timer.C:
timer.Reset(time.Duration(5) * time.Second)
server.Miners.Range(func(k, v interface{}) bool {
m, ok := v.(*(coin.MinerObj))
if ok {
//if m.User == "" || m.Miner == "" {
//return true
//}
//if (m.Status != coin.MINER_STATUS_AUTHORIZED) && (m.Status != coin.MINER_STATUS_RUNNING) {
//return true
//}
if time.Now().Sub(m.ConnSetupTime).Seconds() < 10 {
return true
}
if m.RecvedLiveAck {
m.RecvedLiveAck = false
m.PongFailCnt = 0
} else {
m.PongFailCnt = m.PongFailCnt + 1
//log.Println("ping", m.RecvedLiveAck, m.PongFailCnt)
if m.PongFailCnt > stratum.STRATUM_PING_FAILED_MAX_CNT {
//server.Logg.Error("[server]", zap.String("ping fail", fmt.Sprint(m.PongFailCnt)+m.User+"."+m.Miner))
if m.ZlogInit {
m.Zlog.Info().Msg("ping failed " + fmt.Sprint(m.PongFailCnt) + " " + m.User + "." + m.Miner)
}
m.Conn.Close()
return true
}
}
m.PingCnt = m.PingCnt + 1
if m.PingCnt > stratum.STRATUM_PING_INTERVAL_CNT {
m.PingCnt = 0
var msg stratum.Ping_msg
msg.ID = m.KeepliveCnt
msg.Method = "mining.ping"
msg.Params = nil
m.KeepliveCnt++
body, err := json.Marshal(msg)
if err != nil {
//server.Logg.Error("[server]", zap.String("failed to do_keeplive_ping", err.Error()), zap.String("user", m.User), zap.String("miner", m.Miner))
if m.ZlogInit {
m.Zlog.Info().Msg("failed to Marshal " + err.Error() + " " + m.User + "." + m.Miner)
}
return true
}
body_string := string(body) + "\n"
err = stratum.Conn_tx(m.Conn, []byte(body_string))
if err != nil {
//server.Logg.Error("[server]", zap.String("failed to do_keeplive_ping", err.Error()), zap.String("user", m.User), zap.String("miner", m.Miner))
if m.ZlogInit {
m.Zlog.Info().Msg("failed to do_keeplive_ping " + err.Error() + " " + m.User + "." + m.Miner)
}
return true
}
}
}
return true
})
case <-server.ExitPingChan:
//log.Println("exit do_keepalive_ping!")
server.Logg.Error("[server]", zap.String("do_keepalive_ping", "exit by chan"))
return
}
}
}
func HandleKeepAlive(server *coin.ServerContext) {
do_keepalive_ping(server)
}
func init_miner(miner *coin.MinerObj, conn net.Conn, server *coin.ServerContext) {
atomic.StoreInt32(&(miner.NeedExit), 0)
miner.ZlogInit = false
//miner.EndCh = make(chan bool, 32768)
miner.FromIP = conn.RemoteAddr().String()
est_time := time.Now()
miner.OnlineTime = est_time
miner.OfflineTime = est_time
miner.Retry = 0
miner.DurationTime = 0
miner.MinerIndex = server.MinerIndex
//server.MinerIndex++
miner.ErrStaleds = 0
miner.ErrLowDiffs = 0
miner.ErrDuplicates = 0
miner.ErrFormats = 0
miner.ErrOthers = 0
miner.IsDisabled = false
miner.Submits = 0
miner.Blocks = 0
miner.Orphans = 0
miner.Accepts5M = 0
miner.Accepts15M = 0
miner.Accepts30M = 0
miner.Accepts1h = 0
miner.Accepts3h = 0
miner.Accepts6h = 0
miner.Accepts12h = 0
miner.Accepts24h = 0
miner.Accepts48h = 0
miner.Rejects5M = 0
miner.Rejects15M = 0
miner.Rejects30M = 0
miner.Rejects1h = 0
miner.Rejects3h = 0
miner.Rejects6h = 0
miner.Rejects12h = 0
miner.Rejects24h = 0
miner.Rejects48h = 0
miner.Mhs5M = 0
miner.Mhs15M = 0
miner.Mhs30M = 0
miner.Mhs1h = 0
miner.Mhs3h = 0
miner.Mhs6h = 0
miner.Mhs12h = 0
miner.Mhs24h = 0
miner.Mhs48h = 0
miner.Reward = 0
miner.Fee = 0
miner.Name = server.MinerType
miner.MinerId = coin.Guid()
miner.Accepts = 0
miner.Rejects = 0
miner.M5Accepts = 0
miner.AverageHashrate = 0
miner.M5Hashrate = 0
miner.Conn = conn
miner.KeepliveCnt = 0
miner.RecvedLiveAck = false
miner.PongFailCnt = 0
miner.PingCnt = 0
miner.Difficulty = server.Config.Diff.StartDifficulty
miner.DifficultyNext = -1
var jobs sync.Map
miner.Jobs = jobs
var jobs_lock sync.Mutex
miner.LockForJobs = jobs_lock
miner.JobList = list.New()
miner.LastJobId = ""
miner.Server = server
miner.StartSubmitTime = est_time
miner.LastSubmitime = est_time
miner.SubmitIndex = 0
miner.M5SubmitTime = est_time
miner.Job = server.SJob
miner.Status = coin.MINER_STATUS_CONNECTED
var txlock sync.Mutex
miner.TxLock = txlock
miner.ConnSetupTime = est_time
miner.Reconnect = true
miner.Authorized = false
miner.VarDiffOpt.VariancePercent = miner.Server.Config.Diff.DiffAdjustPercentage
miner.VarDiffOpt.AdjustTime = miner.Server.Config.Diff.DiffAdjustTime
miner.VarDiffOpt.MinShares = miner.VarDiffOpt.AdjustTime / miner.Server.Config.Diff.DiffAdjustInterval * miner.Difficulty * (1 - miner.VarDiffOpt.VariancePercent)
miner.VarDiffOpt.MaxShares = miner.VarDiffOpt.AdjustTime / miner.Server.Config.Diff.DiffAdjustInterval * miner.Difficulty * (1 + miner.VarDiffOpt.VariancePercent)
miner.VarDiffOpt.TargetShares = miner.VarDiffOpt.AdjustTime / miner.Server.Config.Diff.DiffAdjustInterval * miner.Difficulty
miner.VarDiffOpt.MinDiff = miner.Server.Config.Diff.DiffMin
miner.VarDiffOpt.MaxDiff = miner.Server.Config.Diff.DiffMax
miner.VarDiffOpt.AdjustInterval = miner.Server.Config.Diff.DiffAdjustInterval
miner.VarDiffOpt.SubmitShares = 0
miner.VarDiffOpt.SilenceCount = 0
miner.VarDiffOpt.LastCalcTime = est_time
miner.VarDiffOpt.LastSubmitTime = est_time
miner.VarDiffOpt.Level = coin.Mid
miner.VarDiffOpt.Uptimes = 0
miner.VarDiffOpt.Downtimes = 0
miner.DiffHandler.Init(miner.Difficulty, miner.Server.Config.Diff.DiffMin, miner.Server.Config.Diff.DiffMax, miner.Server.Config.Diff.DiffAdjustInterval)
server.CoinCtx.InitMiner(miner)
}
func NotifyMinerEnd(miner *coin.MinerObj) {
miner.Server.Miners.Range(func(k, v interface{}) bool {
m, ok := v.(*(coin.MinerObj))
if ok {
if m.User == "" || m.Miner == "" || fmt.Sprint(m.MinerIndex) == "" {
return true
}
if (m.Accepts > 0) && (m.Status == coin.MINER_STATUS_DISCONNECTED) {
if (m.User == miner.User) && (m.Miner == miner.Miner) {
miner.OfflineTime = m.OfflineTime
//m.EndCh <- true
atomic.StoreInt32(&(m.NeedExit), 1)
}
}
}
return true
})
}
func RestoreMinerFromCache(miner *coin.MinerObj) {
val := cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "submits")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.Submits += intVal
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "blocks")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.Blocks += intVal
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "rejects")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.Rejects += float64(intVal)
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "retry")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.Retry += int64(intVal)
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "staleds")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.ErrStaleds += intVal
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lowdiffs")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.ErrLowDiffs += intVal
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "duplicates")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.ErrDuplicates += intVal
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "formats")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.ErrFormats += intVal
}
}
val = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "others")
if val != nil {
if intVal, ok := val.(int64); ok {
miner.ErrOthers += intVal
}
}
val_f := cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "accepts")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
miner.Accepts = fVal
}
}
val_f = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "rewards")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
miner.Reward = fVal
}
}
val_f = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "fee")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
miner.Fee = fVal
}
}
/*val_f = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "diff")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
miner.Difficulty = fVal
}
}*/
val_t := cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "startsubmit")
if val_f != nil {
if tVal, ok := val_t.(time.Time); ok {
miner.StartSubmitTime = tVal
}
}
/*val_t = cache.LoadMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lastsubmit")
if val_t != nil {
if tVal, ok := val_t.(time.Time); ok {
miner.LastSubmitTime = tVal
}
}*/
StartSubmitTime_cnt := 0
Accepts_cnt := 0
Rejects_cnt := 0
k := miner.User + "." + miner.Miner + "_" + fmt.Sprint(miner.MinerIndex)
m, ok := miner.Server.MMhs.Load(k)
if ok {
var mhs coin.MhsObj = m.(coin.MhsObj)
val_t = cache.LoadMhsCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "starttime")
if val_t != nil {
if tVal, ok := val_t.(time.Time); ok {
mhs.StartSubmitTime = tVal
StartSubmitTime_cnt++
}
}
val_items := cache.LoadMhsCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "accepts")
if val_items != nil {
if itemsVal, ok := val_items.(*[]cache.CacheMhsItem); ok {
for _, item := range *itemsVal {
var mhsItem coin.MhsItem
mhsItem.Tt, _ = time.Parse(time.RFC3339, item.Tt)
mhsItem.Diff = item.Diff
mhs.Accepts = append(mhs.Accepts, mhsItem)
Accepts_cnt++
}
}
}
val_items = cache.LoadMhsCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "rejects")
if val_items != nil {
if itemsVal, ok := val_items.(*[]cache.CacheMhsItem); ok {
for _, item := range *itemsVal {
var mhsItem coin.MhsItem
mhsItem.Tt, _ = time.Parse(time.RFC3339, item.Tt)
mhsItem.Diff = item.Diff
mhs.Rejects = append(mhs.Rejects, mhsItem)
Rejects_cnt++
}
}
}
miner.Server.MMhs.Store(k, m)
}
log.Println("from redis", miner.User+"."+miner.Miner, "Submits", miner.Submits, "Blocks", miner.Blocks, "Rejects", miner.Rejects, "Retry", miner.Retry, "ErrStaleds", miner.ErrStaleds, "ErrLowDiffs", miner.ErrLowDiffs, "ErrDuplicates", miner.ErrDuplicates, "ErrFormats", miner.ErrFormats, "ErrOthers", miner.ErrOthers, "Accepts", miner.Accepts, "Reward", miner.Reward, "Fee", miner.Fee, "StartSubmitTime", miner.StartSubmitTime, "mhs cnt", StartSubmitTime_cnt, Accepts_cnt, Rejects_cnt)
}
func RandomSleep(min time.Duration, max time.Duration) {
// Seed the random number generator
rand.Seed(time.Now().UnixNano())
// Generate a random duration between min and max
duration := time.Duration(rand.Int63n(int64(max-min)) + int64(min))
// Sleep for the generated duration
//fmt.Printf("Sleeping for %v\n", duration)
time.Sleep(duration)
//fmt.Println("Woke up!")
}
func handle_miner_connection(miner *coin.MinerObj) {
//var count int = 0
reader := bufio.NewReader(miner.Conn)
// fmt.Println("有机器连进来了!")
//var rxmsg string
for {
/*msg, err := reader.ReadString('\n')
if err != nil {
//if err != io.EOF {
logg.Error("[server]", zap.String("ReadString", err.Error()))
break
//} else {
// continue
//}
}*/
msg, err := reader.ReadString('\n')
// fmt.Println("矿工发来消息:", msg)
if err != nil {
if err != io.EOF {
logg.Error("[server]", zap.String("ReadString", err.Error()))
break
} else {
if (err == io.EOF) && (len(msg) <= 0) {
logg.Error("[server]", zap.String("ReadString", err.Error()))
break
}
}
}
msgSize := len(msg)
if msgSize >= 256 {
logg.Error("[server]", zap.Int("ReadString too long", msgSize))
break
}
/*var msg string
buffer := make([]byte, 1024)
rdn, err := reader.Read(buffer)
if err != nil {
if err != io.EOF || ((err == io.EOF) && (rdn <= 0)) {
//logg.Error("[server]", zap.String("Read", err.Error()))
if miner.ZlogInit {
miner.Zlog.Info().Msg(miner.User + "." + miner.Miner + " Read " + err.Error())
}
break
}
}
if rdn > 0 {
//count = 0
//log.Println("rxmsg", string(buffer[:rdn]))
rxmsg += string(buffer[:rdn])
retch_index := strings.Index(rxmsg, "\n")
if retch_index != -1 { //found
msg = rxmsg[:retch_index+1]
tempmsg := rxmsg[retch_index+1:]
rxmsg = tempmsg
} else { //not found, discard
rxmsg = ""
}
}*/
//log.Println("msg", msg)
if len(msg) > 0 {
miner.RecvedLiveAck = true
msg = strings.TrimSpace(msg)
if (miner.Status == coin.MINER_STATUS_AUTHORIZED) || (miner.Status == coin.MINER_STATUS_RUNNING) {
if miner.ZlogInit {
miner.Zlog.Info().Msg(msg)
}
}
var ret map[string]interface{}
err = json.Unmarshal([]byte(msg), &ret)
// fmt.Println("矿工发来消息:", &ret)
if err == nil {
//logg.Debug("[server]", zap.Any("msg", msg))
_, ok := ret["method"].(string)
if ok {
switch ret["method"].(string) {
case "mining.pong":
Miner_difficulty_adjust(miner)
//miner.RecvedLiveAck = true
break
case "mining.subscribe":
miner.Server.CoinCtx.HandleMinerSubscribe(miner, (ret["id"].(float64)), miner.Job.Extranonce1, msg)
break
case "mining.extranonce.subscribe":
stratum.Handle_extranonce(miner, (ret["id"].(float64)))
break
case "mining.authorize":
auth_ok := stratum.Handle_authorize(miner, (ret["id"].(float64)), msg, miner.Server.DbCtx)
if auth_ok {
miner.Server.CoinCtx.SetDifficulty(miner)
miner.Server.CoinCtx.Notify(miner)
}
break
case "mining.submit":
prev_status := miner.Status
var s stratum.AlphSubmitNonce
if err = json.Unmarshal([]byte(msg), &s); err != nil {
//logg.Error("[server]", zap.String("mining.submit Unmarshal", err.Error()))
if miner.ZlogInit {
miner.Zlog.Info().Msg("failed to mining.submit Unmarshal " + err.Error() + " " + miner.User + "." + miner.Miner)
}
} else {
accept_ok := false
submit_ok := false
handle_ok := false
// if len(s.Params) == 3 {
// accept_ok, submit_ok, handle_ok = miner.Server.CoinCtx.HandleMinerSubmit(miner, (ret["id"].(float64)), s.Params[0], s.Params[1], "", "", s.Params[2])
// } else if len(s.Params) == 5 {
// accept_ok, submit_ok, handle_ok = miner.Server.CoinCtx.HandleMinerSubmit(miner, (ret["id"].(float64)), s.Params[0], s.Params[1], s.Params[2], s.Params[3], s.Params[4])
// } else {
// stratum.Handle_exception(miner, (ret["id"].(float64)), stratum.MINER_ERR_ILLEGAL_PARARMS)
// }
accept_ok, submit_ok, handle_ok = miner.Server.CoinCtx.HandleMinerSubmit(miner, (ret["id"].(float64)), s.Params.Worker, s.Params.JobID, "", "", s.Params.Nonce)
// fmt.Println("提交结果:", accept_ok, submit_ok, handle_ok)
if handle_ok {
//miner.RecvedLiveAck = true
if (prev_status == coin.MINER_STATUS_AUTHORIZED) && (miner.Status == coin.MINER_STATUS_RUNNING) {
stratum.InitMinerMhs(miner, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), miner.MinerId, miner.Status, miner.Server.DbCtx)
NotifyMinerEnd(miner)
RestoreMinerFromCache(miner)
}
if accept_ok {
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "accepts", miner.Accepts)
if submit_ok {
miner.Submits += 1
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "submits", miner.Submits)
/*miner.Server.PoolSLock.Lock()
miner.Server.Submits += 1
cache.StorePoolCache(miner.Server.RedisClient, miner.Server.MinerType, "submits", miner.Server.Submits)
miner.Server.PoolSLock.Unlock()*/
}
if miner.Accepts == 1 {
miner.Retry += 1
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "retry", miner.Retry)
}
/*miner.Server.PoolSLock.Lock()
miner.Server.Accepts += miner.Difficulty
cache.StorePoolCache(miner.Server.RedisClient, miner.Server.MinerType, "accepts", miner.Server.Accepts)
miner.Server.PoolSLock.Unlock()*/
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lastsubmit", miner.LastSubmitime)
stratum.UpdateMhs(miner, true, miner.Difficulty, 0, miner.Server.DbCtx)
} else {
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "staleds", miner.ErrStaleds)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lowdiffs", miner.ErrLowDiffs)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "duplicates", miner.ErrDuplicates)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "formats", miner.ErrFormats)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "others", miner.ErrOthers)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "rejects", miner.Rejects)
/*miner.Server.PoolSLock.Lock()
miner.Server.Rejects += 1
cache.StorePoolCache(miner.Server.RedisClient, miner.Server.MinerType, "rejects", miner.Server.Rejects)
miner.Server.PoolSLock.Unlock()*/
stratum.UpdateMhs(miner, false, miner.Difficulty, 0, miner.Server.DbCtx)
}
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "diff", miner.Difficulty)
//UpdateUserSets(miner, false)
}
}
break
case "alph_submitHashrate":
params := ret["params"].([]interface{})
hashrate := params[1].(string)
stratum.Handle_submitHashrate(miner, hashrate)
break
}
} else {
switch ret["result"].(type) {
case string:
result := ret["result"].(string)
if result == "mining.pong" || result == "pong" {
Miner_difficulty_adjust(miner)
//miner.RecvedLiveAck = true
//logg.Info("[server]", zap.String("ack pong result", result), zap.String("user", miner.User), zap.String("miner", miner.Miner), zap.Float64("id", ret["id"].(float64)))
}
break
}
}
} else {
//logg.Error("[server]", zap.String("Unmarshal", err.Error()))
if miner.ZlogInit {
miner.Zlog.Info().Msg("failed to mining.submit Unmarshal " + err.Error() + " " + miner.User + "." + miner.Miner)
}
stratum.Handle_exception(miner, 0, stratum.MINER_ERR_ILLEGAL_PARARMS)
}
} else {
//if rdn > 0 {
//} else {
//time.Sleep(20 * time.Millisecond)
/*if count++; count >= 10 {
break
}*/
//time.Sleep(time.Duration(1) * time.Second / 50)
runtime.Gosched()
//}
//time.Sleep(time.Duration(1) * time.Second / 2)
//runtime.Gosched()
}
}
miner.TxLock.Lock()
prev_status := miner.Status
miner.Status = coin.MINER_STATUS_DISCONNECTED
miner.TxLock.Unlock()
//minDuration := 100 * time.Millisecond
//maxDuration := 5 * time.Second
//RandomSleep(minDuration, maxDuration)
stratum.UpdateMhsStatus(miner, miner.Server.DbCtx)
miner.Conn.Close()
if miner.Accepts > 0 /*|| (miner.Rejects > 0)*/ {
miner.OfflineTime = miner.LastSubmitime
}
if miner.ZlogInit {
miner.Zlog.Level(zerolog.Disabled)
}
//defer miner.LogR.Close()
if miner.LogR != nil {
defer miner.LogR.Close()
}
//logg.Error("[server]", zap.String("miner disconnect", miner.MinerId))
if miner.ZlogInit {
miner.Zlog.Info().Msg("miner disconnect prepare " + " " + miner.User + "." + miner.Miner)
}
stratum.StaleAllJobs(miner)
if prev_status == coin.MINER_STATUS_RUNNING {
/*
select {
case <-time.After(600 * time.Second):
logg.Error("[server]", zap.String("miner end", miner.MinerId+" "+miner.User+"."+miner.Miner+"_"+fmt.Sprint(miner.MinerIndex)))
case <-miner.EndCh:
logg.Error("[server]", zap.String("miner end chan", miner.MinerId+" "+miner.User+"."+miner.Miner+"_"+fmt.Sprint(miner.MinerIndex)))
}*/
var to_cnt int = 0
for {
if atomic.LoadInt32(&(miner.NeedExit)) == 1 {
break
}
if to_cnt > 360 {
break
}
time.Sleep(time.Duration(1) * time.Second)
to_cnt++
}
}
//UpdateUserSets(miner, true)
miner.Server.Miners.Delete(miner.MinerId)
//logg.Error("[server]", zap.String("miner disconnect", miner.MinerId), zap.String("user ", miner.User), zap.String("miner ", miner.Miner), zap.Int("connected num ", num))
logg.Error("[server]", zap.String("miner disconnected", miner.MinerId), zap.String("user ", miner.User), zap.String("miner ", miner.Miner))
}
func handle_miner_connection_old(miner *coin.MinerObj) {
//var count int = 0
reader := bufio.NewReader(miner.Conn)
var rxmsg string
for {
/*msg, err := reader.ReadString('\n')
if err != nil {
//if err != io.EOF {
logg.Error("[server]", zap.String("ReadString", err.Error()))
break
//} else {
// continue
//}
}*/
var msg string
buffer := make([]byte, 1024)
rdn, err := reader.Read(buffer)
if err != nil {
if err != io.EOF || ((err == io.EOF) && (rdn <= 0)) {
//logg.Error("[server]", zap.String("Read", err.Error()))
if miner.ZlogInit {
miner.Zlog.Info().Msg(miner.User + "." + miner.Miner + " Read " + err.Error())
}
break
}
}
if rdn > 0 {
//count = 0
//log.Println("rxmsg", string(buffer[:rdn]))
rxmsg += string(buffer[:rdn])
retch_index := strings.Index(rxmsg, "\n")
if retch_index != -1 { //found
msg = rxmsg[:retch_index+1]
tempmsg := rxmsg[retch_index+1:]
rxmsg = tempmsg
} else { //not found, discard
rxmsg = ""
}
}
//log.Println("msg", msg)
if len(msg) > 0 {
miner.RecvedLiveAck = true
msg = strings.TrimSpace(msg)
if (miner.Status == coin.MINER_STATUS_AUTHORIZED) || (miner.Status == coin.MINER_STATUS_RUNNING) {
if miner.ZlogInit {
miner.Zlog.Info().Msg(msg)
}
}
var ret map[string]interface{}
err = json.Unmarshal([]byte(msg), &ret)
if err == nil {
//logg.Debug("[server]", zap.Any("msg", msg))
_, ok := ret["method"].(string)
if ok {
switch ret["method"].(string) {
case "mining.pong":
Miner_difficulty_adjust(miner)
//miner.RecvedLiveAck = true
break
case "mining.subscribe":
miner.Server.CoinCtx.HandleMinerSubscribe(miner, (ret["id"].(float64)), miner.Job.Extranonce1, msg)
break
case "mining.extranonce.subscribe":
stratum.Handle_extranonce(miner, (ret["id"].(float64)))
break
case "mining.authorize":
auth_ok := stratum.Handle_authorize(miner, (ret["id"].(float64)), msg, miner.Server.DbCtx)
if auth_ok {
miner.Server.CoinCtx.SetDifficulty(miner)
miner.Server.CoinCtx.Notify(miner)
}
break
case "mining.submit":
prev_status := miner.Status
if miner.Server.CoinCtx.Coin != "alph" {
var s stratum.Submit_nonce
if err = json.Unmarshal([]byte(msg), &s); err != nil {
//logg.Error("[server]", zap.String("mining.submit Unmarshal", err.Error()))
if miner.ZlogInit {
miner.Zlog.Info().Msg("failed to mining.submit Unmarshal " + err.Error() + " " + miner.User + "." + miner.Miner)
}
} else {
accept_ok := false
submit_ok := false
handle_ok := false
if len(s.Params) == 3 {
// [0]: wallet.worker_name [1]: header hash [2]: nonce
if miner.Server.CoinCtx.Coin == "enx" || miner.Server.CoinCtx.Coin == "kas" {
accept_ok, submit_ok, handle_ok = miner.Server.CoinCtx.HandleMinerSubmit(miner, (ret["id"].(float64)), s.Params[0], s.ID.(string), s.Params[1], "", s.Params[2])
} else {
accept_ok, submit_ok, handle_ok = miner.Server.CoinCtx.HandleMinerSubmit(miner, (ret["id"].(float64)), s.Params[0], s.Params[1], "", "", s.Params[2])
}
} else if len(s.Params) == 5 {
accept_ok, submit_ok, handle_ok = miner.Server.CoinCtx.HandleMinerSubmit(miner, (ret["id"].(float64)), s.Params[0], s.Params[1], s.Params[2], s.Params[3], s.Params[4])
} else {
stratum.Handle_exception(miner, (ret["id"].(float64)), stratum.MINER_ERR_ILLEGAL_PARARMS)
}
if handle_ok {
//miner.RecvedLiveAck = true
if (prev_status == coin.MINER_STATUS_AUTHORIZED) && (miner.Status == coin.MINER_STATUS_RUNNING) {
stratum.InitMinerMhs(miner, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), miner.MinerId, miner.Status, miner.Server.DbCtx)
NotifyMinerEnd(miner)
RestoreMinerFromCache(miner)
}
if accept_ok {
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "accepts", miner.Accepts)
if submit_ok {
miner.Submits += 1
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "submits", miner.Submits)
/*miner.Server.PoolSLock.Lock()
miner.Server.Submits += 1
cache.StorePoolCache(miner.Server.RedisClient, miner.Server.MinerType, "submits", miner.Server.Submits)
miner.Server.PoolSLock.Unlock()*/
}
if miner.Accepts == 1 {
miner.Retry += 1
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "retry", miner.Retry)
}
/*miner.Server.PoolSLock.Lock()
miner.Server.Accepts += miner.Difficulty
cache.StorePoolCache(miner.Server.RedisClient, miner.Server.MinerType, "accepts", miner.Server.Accepts)
miner.Server.PoolSLock.Unlock()*/
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lastsubmit", miner.LastSubmitime)
stratum.UpdateMhs(miner, true, miner.Difficulty, 0, miner.Server.DbCtx)
} else {
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "staleds", miner.ErrStaleds)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lowdiffs", miner.ErrLowDiffs)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "duplicates", miner.ErrDuplicates)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "formats", miner.ErrFormats)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "others", miner.ErrOthers)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "rejects", miner.Rejects)
/*miner.Server.PoolSLock.Lock()
miner.Server.Rejects += 1
cache.StorePoolCache(miner.Server.RedisClient, miner.Server.MinerType, "rejects", miner.Server.Rejects)
miner.Server.PoolSLock.Unlock()*/
stratum.UpdateMhs(miner, false, miner.Difficulty, 0, miner.Server.DbCtx)
}
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "diff", miner.Difficulty)
//UpdateUserSets(miner, false)
}
}
} else {
var s stratum.AlphSubmitNonce
if err = json.Unmarshal([]byte(msg), &s); err != nil {
if miner.ZlogInit {
miner.Zlog.Info().Msg("failed to mining.submit Unmarshal " + err.Error() + " " + miner.User + "." + miner.Miner)
}
} else {
accept_ok := false
submit_ok := false
handle_ok := false
accept_ok, submit_ok, handle_ok = miner.Server.CoinCtx.HandleMinerSubmit(miner, s.Id.(float64), s.Params.WorkerName, s.Params.JobID, "", "", s.Params.Nonce)
if handle_ok {
//miner.RecvedLiveAck = true
if (prev_status == coin.MINER_STATUS_AUTHORIZED) && (miner.Status == coin.MINER_STATUS_RUNNING) {
stratum.InitMinerMhs(miner, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), miner.MinerId, miner.Status, miner.Server.DbCtx)
NotifyMinerEnd(miner)
RestoreMinerFromCache(miner)
}
if accept_ok {
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "accepts", miner.Accepts)
if submit_ok {
miner.Submits += 1
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "submits", miner.Submits)
}
if miner.Accepts == 1 {
miner.Retry += 1
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "retry", miner.Retry)
}
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lastsubmit", miner.LastSubmitime)
stratum.UpdateMhs(miner, true, miner.Difficulty, 0, miner.Server.DbCtx)
} else {
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "staleds", miner.ErrStaleds)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "lowdiffs", miner.ErrLowDiffs)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "duplicates", miner.ErrDuplicates)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "formats", miner.ErrFormats)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "others", miner.ErrOthers)
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "rejects", miner.Rejects)
stratum.UpdateMhs(miner, false, miner.Difficulty, 0, miner.Server.DbCtx)
}
cache.StoreMinerCache(miner.Server.RedisClient, miner.Server.MinerType, miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), "diff", miner.Difficulty)
}
}
}
break
}
} else {
switch ret["result"].(type) {
case string:
result := ret["result"].(string)
if result == "mining.pong" || result == "pong" {
Miner_difficulty_adjust(miner)
//miner.RecvedLiveAck = true
//logg.Info("[server]", zap.String("ack pong result", result), zap.String("user", miner.User), zap.String("miner", miner.Miner), zap.Float64("id", ret["id"].(float64)))
}
break
}
}
} else {
//logg.Error("[server]", zap.String("Unmarshal", err.Error()))
if miner.ZlogInit {
miner.Zlog.Info().Msg("failed to mining.submit Unmarshal " + err.Error() + " " + miner.User + "." + miner.Miner)
}
stratum.Handle_exception(miner, 0, stratum.MINER_ERR_ILLEGAL_PARARMS)
}
} else {
if rdn > 0 {
} else {
//time.Sleep(20 * time.Millisecond)
/*if count++; count >= 10 {
break
}*/
//time.Sleep(time.Duration(1) * time.Second / 50)
runtime.Gosched()
}
//time.Sleep(time.Duration(1) * time.Second / 2)
//runtime.Gosched()
}
}
miner.TxLock.Lock()
prev_status := miner.Status
miner.Status = coin.MINER_STATUS_DISCONNECTED
miner.TxLock.Unlock()
//minDuration := 100 * time.Millisecond
//maxDuration := 5 * time.Second
//RandomSleep(minDuration, maxDuration)
stratum.UpdateMhsStatus(miner, miner.Server.DbCtx)
miner.Conn.Close()
if miner.Accepts > 0 /*|| (miner.Rejects > 0)*/ {
miner.OfflineTime = miner.LastSubmitime
}
if miner.ZlogInit {
miner.Zlog.Level(zerolog.Disabled)
}
//defer miner.LogR.Close()
if miner.LogR != nil {
defer miner.LogR.Close()
}
//logg.Error("[server]", zap.String("miner disconnect", miner.MinerId))
if miner.ZlogInit {
miner.Zlog.Info().Msg("miner disconnect prepare " + " " + miner.User + "." + miner.Miner)
}
if prev_status == coin.MINER_STATUS_RUNNING {
/*
select {
case <-time.After(600 * time.Second):
logg.Error("[server]", zap.String("miner end", miner.MinerId+" "+miner.User+"."+miner.Miner+"_"+fmt.Sprint(miner.MinerIndex)))
case <-miner.EndCh:
logg.Error("[server]", zap.String("miner end chan", miner.MinerId+" "+miner.User+"."+miner.Miner+"_"+fmt.Sprint(miner.MinerIndex)))
}*/
var to_cnt int = 0
for {
if atomic.LoadInt32(&(miner.NeedExit)) == 1 {
break
}
if to_cnt > 360 {
break
}
time.Sleep(time.Duration(1) * time.Second)
to_cnt++
}
}
//UpdateUserSets(miner, true)
miner.Server.Miners.Delete(miner.MinerId)
//logg.Error("[server]", zap.String("miner disconnect", miner.MinerId), zap.String("user ", miner.User), zap.String("miner ", miner.Miner), zap.Int("connected num ", num))
logg.Error("[server]", zap.String("miner disconnected", miner.MinerId), zap.String("user ", miner.User), zap.String("miner ", miner.Miner))
}
func Miner_difficulty_adjust(m *coin.MinerObj) {
if m.User == "" || m.Miner == "" {
return
}
now := time.Now()
//if now.Sub(m.LastSubmitime).Seconds() > m.VarDiffOpt.AdjustInterval*5 {
if now.Sub(m.VarDiffOpt.LastSubmitTime).Seconds() > m.VarDiffOpt.AdjustInterval*10 {
if m.Server.Config.Diff.Filter == "kalman" {
share_interval := now.Sub(m.VarDiffOpt.LastSubmitTime).Seconds()
mhs := m.Difficulty * share_interval
diff_next, kalman_p := m.DiffHandler.Handler(m.Difficulty, share_interval)
//diff_next, _ := m.DiffHandler.Handler(m.Difficulty, share_interval)
mhs_est := diff_next * m.Server.Config.Diff.DiffAdjustInterval
ratio := diff_next / m.Difficulty
if ratio > 0 {
if now.Sub(m.StartSubmitTime).Seconds() > 180 {
if ratio >= 2 {
//m.DifficultyNext = math.Ceil(diff_next*100.0) / 100.0
m.DifficultyNext = diff_next * 10000000 / 10000000
} else if ratio <= 0.5 {
//m.DifficultyNext = math.Ceil(diff_next*100.0) / 100.0
m.DifficultyNext = diff_next * 10000000 / 10000000
} else {
}
} else {
//m.DifficultyNext = math.Ceil(diff_next*100.0) / 100.0
m.DifficultyNext = diff_next * 10000000 / 10000000
/*if ratio >= 1.1 {
m.DifficultyNext = math.Ceil(diff_next*100.0) / 100.0
} else if ratio <= 0.8 {
m.DifficultyNext = math.Ceil(diff_next*100.0) / 100.0
} else {
}*/
}
}
if m.DifficultyNext > 0.0 {
if m.DifficultyNext < m.VarDiffOpt.MinDiff {
m.DifficultyNext = m.VarDiffOpt.MinDiff
} else if m.DifficultyNext > m.VarDiffOpt.MaxDiff {
m.DifficultyNext = m.VarDiffOpt.MaxDiff
}
}
if m.Server.Config.Diff.Dbg {
coin.New_diff_into_db(m.User, m.Miner, fmt.Sprint(m.MinerIndex), m.Difficulty, diff_next, kalman_p, share_interval, mhs, mhs_est)
}
//m.VarDiffOpt.LastCalcTime = now
m.VarDiffOpt.LastSubmitTime = now
//log.Println("diff adjust timeout", ratio, diff_next, m.Difficulty, m.DifficultyNext)
} else {
m.VarDiffOpt.Level = coin.Hign
coin.VarAdjustDifficulty(m, coin.DOWN_DIFF)
m.VarDiffOpt.LastCalcTime = now
m.VarDiffOpt.LastSubmitTime = now
}
}
}
/*func difficulty_adjust(server *coin.ServerContext) {
server.Miners.Range(func(k, v interface{}) bool {
m, ok := v.(*coin.MinerObj)
if ok {
if m.User == "" || m.Miner == "" {
return true
}
now := time.Now()
if now.Sub(m.LastSubmitime).Seconds() > m.VarDiffOpt.AdjustInterval*5 {
if server.Config.Diff.Filter == "kalman" {
share_interval := now.Sub(m.LastSubmitime).Seconds()
//mhs := m.Difficulty * share_interval
//diff_next, kalman_p := m.DiffHandler.Handler(m.Difficulty, share_interval)
diff_next, _ := m.DiffHandler.Handler(m.Difficulty, share_interval)
//mhs_est := diff_next * share_interval
ratio := diff_next / m.Difficulty
if ratio >= 2.0 {
m.DifficultyNext = math.Ceil(diff_next*100) / 100
} else if ratio <= 0.5 {
m.DifficultyNext = math.Ceil(diff_next*100) / 100
} else {
}
//new_diff_into_db(miner.User, miner.Miner, fmt.Sprint(miner.MinerIndex), miner.Difficulty, miner.DifficultyNext, kalman_p, share_interval, mhs, mhs_est)
m.LastSubmitime = now
} else {
m.VarDiffOpt.Level = coin.Hign
coin.VarAdjustDifficulty(m, coin.DOWN_DIFF)
m.LastSubmitime = now
}
}
}
return true
})
}*/
/*func Handle_difficulty_adjust_timer(server *coin.ServerContext) {
timer := time.NewTimer(time.Second * coin.MINER_DIFFICULTY_ADJUST_DURATION)
for {
select {
case <-timer.C:
difficulty_adjust(server)
timer.Reset(time.Second * coin.MINER_DIFFICULTY_ADJUST_DURATION)
case <-(server.ExitDiffVar):
timer.Stop()
server.ExitDiffVar <- true
return
}
}
}*/
func StartServer(server *coin.ServerContext) {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
select {
case issync := <-server.SyncJobChan:
if !issync {
return
}
server.Synced = true
log.Println("StartServer receive job! start")
case <-time.After(time.Duration(60) * time.Second):
log.Println("StartServer not receive job! exited")
if logg != nil {
logg.Error("[server]", zap.String("StartServer not receive job", "exited"))
}
}
var miners sync.Map
server.Miners = miners
var mmhs sync.Map
server.MMhs = mmhs
/*var update_map sync.Map
server.UpdateMap = update_map*/
listener, err := net.Listen("tcp", server.Config.Host.Listen)
server.Listener = listener
if err != nil {
logg.Fatal("[server]", zap.String("Listen", err.Error()))
return
}
go HandleKeepAlive(server)
//go Handle_difficulty_adjust_timer(server)
go dbif.Handle_miners_timer(server, server.DbCtx)
//go dbif.Handle_miners_stats_timer(server, server.DbCtx)
//go dbif.Handle_users_timer(server, server.DbCtx)
//go dbif.Handle_users_stats_timer(server, server.DbCtx)
for {
conn, err := listener.Accept()
if err != nil {
if server.ExitFlag {
break
}
logg.Error("[server]", zap.String("Accept", err.Error()))
continue
}
var miner coin.MinerObj
init_miner(&miner, conn, server)
server.Miners.LoadOrStore(miner.MinerId, &miner)
go handle_miner_connection(&miner)
}
}
var jobIndex int = 0
func HandleJob(server *coin.ServerContext) {
for {
if server.ExitFlag {
fmt.Println("服务已退出!")
break
}
if server.SubCh == nil {
fmt.Println("任务zmq连接未初始化或已断开")
server.SubCh = utility.InitZmqSub(ServerCtx.Config.Zmq.Sub, "job"+server.MinerType)
}
if server.SubCh != nil {
cmsg_sub, err := server.SubCh.RecvMessage()
if err != nil {
time.Sleep(time.Duration(1) * time.Second)
server.SubCh.SetSubscribe("job" + server.MinerType)
server.SubCh.Connect(ServerCtx.Config.Zmq.Sub)
//server.SubCh.SetMaxmsgsize(1024 * 1024 * 8)
//server.SubCh.Destroy()
//server.SubCh = utility.InitZmqSub(ServerCtx.Config.Zmq.Sub, "job"+server.MinerType)
logg.Error("[server]", zap.String("HandleJob", err.Error()))
continue
}
if cmsg_sub != nil {
// log.Println("HandleJob msg size", len(cmsg_sub), string(cmsg_sub[0]))
if len(cmsg_sub) >= 2 {
cmp_topic := "job" + server.CoinCtx.Coin
// log.Println("job 1", string(cmsg_sub[0]), cmp_topic)
if string(cmsg_sub[0]) == cmp_topic {
//logg.Error("[server]", zap.String("job 1", string(cmsg_sub[0])+","+cmp_topic))
//server.AlivingChan <- true
atomic.StoreInt32(&(server.FlagAliving), 1)
cmsg := cmsg_sub[1]
//logg.Error("[server]", zap.String("job 2", string(cmsg_sub[0])+","+cmp_topic))
// log.Println("job 2", string(cmsg_sub[0]), string(cmsg_sub[0])+","+cmp_topic)
jobIndex += 1
// fmt.Println("gbt获取任务时间", time.Now(), "\t", "gbt获取任务序号", jobIndex)
server.CoinCtx.HandleJobMsg(server, cmsg)
if !server.Synced {
server.SyncJobChan <- true
}
//logg.Error("[server]", zap.String("job 3", string(cmsg_sub[0])+","+cmp_topic))
// log.Println("job 3")
}
} else {
logg.Error("[server]", zap.Int("HandleJob len exceeded:", len(cmsg_sub)))
}
}
} else {
logg.Error("[server]", zap.String("HandleJob", "SubCh fail!"))
time.Sleep(time.Duration(1) * time.Second)
}
}
}
/*func LoadCache(server *coin.ServerContext) {
val := cache.LoadPoolCache(server.RedisClient, server.MinerType, "submits")
if val != nil {
if intVal, ok := val.(int64); ok {
server.Submits = intVal
}
}
val = cache.LoadPoolCache(server.RedisClient, server.MinerType, "blocks")
if val != nil {
if intVal, ok := val.(int64); ok {
server.Blocks = intVal
}
}
val = cache.LoadPoolCache(server.RedisClient, server.MinerType, "rejects")
if val != nil {
if intVal, ok := val.(int64); ok {
server.Rejects = float64(intVal)
}
}
val_f := cache.LoadPoolCache(server.RedisClient, server.MinerType, "accepts")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
server.Accepts = fVal
}
}*/
/*val_f = cache.LoadPoolCache(server.RedisClient, server.MinerType, "reward")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
server.Reward = fVal
}
}
val_f = cache.LoadPoolCache(server.RedisClient, server.MinerType, "fee")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
server.Fee = fVal
}
}*/
/*val_f = cache.LoadPoolCache(server.RedisClient, server.MinerType, "refdiff")
if val_f != nil {
if fVal, ok := val_f.(float64); ok {
if fVal > 0 {
server.RefDifficulty = fVal
}
}
}
}*/
func Start(Coin string, DbCtx *db.DbContext) {
zmqctx := C.zmq_ctx_new()
defer C.zmq_ctx_term(zmqctx)
C.set_max_msg_size(zmqctx, 1024*1024*8)
ServerCtx.MinerType = Coin
atomic.StoreInt32(&(ServerCtx.FlagAliving), 0)
atomic.StoreInt32(&(ServerCtx.FlagAlivingExit), 0)
//atomic.StoreInt32(&(ServerCtx.NotifyBlkDetailIdx), 0)
ServerCtx.MinerIndex = 0
ServerCtx.Accepts5M = 0
ServerCtx.Accepts15M = 0
ServerCtx.Accepts30M = 0
ServerCtx.Accepts1h = 0
ServerCtx.Accepts3h = 0
ServerCtx.Accepts6h = 0
ServerCtx.Accepts12h = 0
ServerCtx.Accepts24h = 0
ServerCtx.Accepts48h = 0
ServerCtx.Rejects5M = 0
ServerCtx.Rejects15M = 0
ServerCtx.Rejects30M = 0
ServerCtx.Rejects1h = 0
ServerCtx.Rejects3h = 0
ServerCtx.Rejects6h = 0
ServerCtx.Rejects12h = 0
ServerCtx.Rejects24h = 0
ServerCtx.Rejects48h = 0
ServerCtx.Mhs5M = 0
ServerCtx.Mhs15M = 0
ServerCtx.Mhs30M = 0
ServerCtx.Mhs1h = 0
ServerCtx.Mhs3h = 0
ServerCtx.Mhs6h = 0
ServerCtx.Mhs12h = 0
ServerCtx.Mhs24h = 0
ServerCtx.Mhs48h = 0
ServerCtx.Normal = 0
ServerCtx.Abnormal = 0
ServerCtx.Offline = 0
ServerCtx.MhsZero = 0
ServerCtx.MhsLow = 0
ServerCtx.HighRejects = 0
ServerCtx.Unstable = 0
ServerCtx.NetTarget = ""
ServerCtx.NetHight = 0
ServerCtx.Submits = 0
ServerCtx.Blocks = 0
ServerCtx.Orphans = 0
ServerCtx.Reward = 0
ServerCtx.Fee = 0
ServerCtx.Synced = false
ServerCtx.ExitFlag = false
ServerCtx.ExitDiffVar = make(chan bool, 256)
ServerCtx.SyncJobChan = make(chan bool, 256)
ServerCtx.ExitDbMiners = make(chan bool, 256)
ServerCtx.ExitDbMinersStats = make(chan bool, 256)
ServerCtx.ExitDbUser = make(chan bool, 256)
ServerCtx.ExitDbUserStats = make(chan bool, 256)
ServerCtx.ExitDbPoolStats = make(chan bool, 256)
/*var users_slock sync.Mutex
ServerCtx.UsersSLock = users_slock*/
/*var pool_slock sync.Mutex
ServerCtx.PoolSLock = pool_slock*/
ServerCtx.DbCtx = DbCtx
ServerCtx.Config = InitConfig()
ServerCtx.RefDifficulty = 1 //ServerCtx.Config.Diff.StartDifficulty
l, r, _ := utility.InitLogg(&(ServerCtx.Config.Zaplog), &(ServerCtx.Config.Logrotae), Coin, "server")
logg = l
logr = r
ServerCtx.Logg = l
ServerCtx.LogR = logr
opts := &redis.Options{
Addr: ServerCtx.Config.Redis.Addr,
Password: ServerCtx.Config.Redis.Password,
DB: ServerCtx.Config.Redis.DB,
}
ServerCtx.RedisClient = redis.NewClient(opts)
err := ServerCtx.RedisClient.Set(context.Background(), "server", Coin, 0).Err()
if err != nil {
fmt.Println(err)
return
}
register_signal(DbCtx)
register_user_signal(&ServerCtx)
dbif.Create_db_tables(ServerCtx.DbCtx, ServerCtx.MinerType)
ServerCtx.PubCh = utility.InitZmqPub(ServerCtx.Config.Zmq.Pub)
ServerCtx.SubCh = utility.InitZmqSub(ServerCtx.Config.Zmq.Sub, "job"+Coin)
//ServerCtx.AlivingChan = make(chan bool, 32768)
//ServerCtx.LiveingExpired = false
ServerCtx.ExitPingChan = make(chan bool, 32768)
ServerCtx.ExitJobChan = make(chan bool, 32768)
ServerCtx.Started = true
for _, coinobj := range coinobjs {
if coinobj.Coin == Coin {
ServerCtx.CoinCtx = coinobj
break
}
}
// fmt.Println(&ServerCtx)
ServerCtx.CoinCtx.Init(&ServerCtx)
//LoadCache(&ServerCtx)
ServerCtx.CoinCtx.Start()
go StartServer(&ServerCtx)
go HandleJob(&ServerCtx)
go ServerLivingHandler(&ServerCtx)
<-DbCtx.AppExit
log.Println("received exit signal")
}
func Stop() {
log.Println("enter Stop")
ServerCtx.ExitFlag = true
if ServerCtx.Started == true {
ServerCtx.Miners.Range(func(k, v interface{}) bool {
m, ok := v.(*coin.MinerObj)
if ok {
m.Conn.Close()
m.Server.Miners.Delete(m.MinerId)
}
return true
})
time.Sleep(time.Second)
}
ServerCtx.Started = false
//if !ServerCtx.LiveingExpired {
//ServerCtx.AlivingChan <- false
//}
atomic.StoreInt32(&(ServerCtx.FlagAlivingExit), 1)
ServerCtx.ExitPingChan <- true
ServerCtx.ExitDbMiners <- true
ServerCtx.ExitDbMinersStats <- true
ServerCtx.ExitDbUser <- true
ServerCtx.ExitDbUserStats <- true
ServerCtx.ExitDiffVar <- true
ServerCtx.CoinCtx.Stop()
time.Sleep(time.Second)
defer close(ServerCtx.SyncJobChan)
//defer close(ServerCtx.AlivingChan)
defer close(ServerCtx.ExitPingChan)
defer close(ServerCtx.ExitJobChan)
if ServerCtx.Listener != nil {
defer ServerCtx.Listener.Close()
}
if ServerCtx.PubCh != nil {
defer ServerCtx.PubCh.Destroy()
}
if ServerCtx.SubCh != nil {
defer ServerCtx.SubCh.Destroy()
}
defer ServerCtx.RedisClient.Close()
defer logg.Sync()
defer logr.Close()
log.Println("Stopped")
}