update v-advance

This commit is contained in:
lzx
2025-12-01 15:45:05 +08:00
parent 16173b7ccd
commit de010e39ee
22 changed files with 1963 additions and 175 deletions

View File

@@ -17,6 +17,8 @@ import (
message "client/internal/msg"
"client/internal/src"
"client/internal/src/linux"
"client/internal/src/windows"
"client/internal/sustain"
"client/internal/utils"
"encoding/json"
"fmt"
@@ -24,15 +26,31 @@ import (
"net"
"runtime"
"strings"
"sync"
"time"
"os"
"gopkg.in/ini.v1"
)
type Client struct {
Auth string
MachineCode string
ServerConn net.Conn // 服务连接
GPUs map[int]message.GPU // {"gpu编号": message.GPU{}, ...}
os *src.SystemServer
osName string
Auth string
MachineCode string
ServerConn net.Conn // 服务连接
GPUs map[int]message.GPU // {"gpu编号": message.GPU{}, ...}
os *src.SystemServer
osName string
serverURL string // 服务器地址
mu sync.Mutex // 保护连接操作的互斥锁
lastPong time.Time // 最后一次收到pong的时间
stopHeartbeat chan struct{} // 停止心跳的通道
sustainMiner *sustain.SustainMiner // 持续挖矿管理器
}
// osExit 用于在权限检查失败时退出程序,便于测试时覆盖
var osExit = func(code int) {
os.Exit(code)
}
func newClient(url string) *Client {
@@ -46,45 +64,111 @@ func newClient(url string) *Client {
client.Auth = auth
os := src.NewSystemServer()
systemServer := src.NewSystemServer()
sys := runtime.GOOS
if sys == "linux" {
linux_ := linux.NewLinuxClient(auth)
os.ResiterSystem("linux", linux_)
} else {
log.Printf("主机操作系统:%s", sys)
// 启动前进行权限自检(不同系统分别检查)
switch sys {
case "linux":
if err := linux.CheckPermission(); err != nil {
log.Println(err.Error())
log.Println("权限不足,客户端已退出。")
osExit(1)
}
linux_ := linux.NewLinuxClient(auth)
systemServer.ResiterSystem("linux", linux_)
case "windows":
if err := windows.CheckPermission(); err != nil {
log.Println(err.Error())
log.Println("权限不足,客户端已退出。")
osExit(1)
}
windows_ := windows.NewWindowsClient(auth)
systemServer.ResiterSystem("windows", windows_)
default:
log.Printf("不支持的操作系统:%s客户端已退出。", sys)
osExit(1)
}
client.os = os
client.os = systemServer
client.osName = sys
// 初始化持续挖矿管理器(从配置文件读取挖矿软件路径)
var miningConfig message.MiningConfig
var confFile string
if sys == "windows" {
confFile = "mining.windows.conf"
} else {
confFile = "mining.linux.conf"
}
// 读取挖矿配置
cfg, err := ini.Load(confFile)
if err == nil {
sectionBzMiner := cfg.Section("bzminer")
miningConfig.BzMinerPath = sectionBzMiner.Key("path").String()
sectionLolMiner := cfg.Section("lolminer")
miningConfig.LolMinerPath = sectionLolMiner.Key("path").String()
sectionRigel := cfg.Section("rigel")
miningConfig.RigelPath = sectionRigel.Key("path").String()
sectionProxy := cfg.Section("proxy")
miningConfig.ProxyEnabled, _ = sectionProxy.Key("proxy").Bool()
}
client.sustainMiner = sustain.NewSustainMiner(systemServer, sys, miningConfig)
// 读取主机MAC地址信息
var machine_code string
machine_code, err = os.GetMACAddress(sys)
machine_code, err = systemServer.GetMACAddress(sys)
if err != nil {
log.Fatalln(err)
panic("获取当前主机信息失败,程序已退出,请检查网络后重新启动本客户端。")
}
utils.WirteFile("./machinecode", machine_code)
// utils.WirteFile("./machinecode", machine_code)
client.MachineCode = machine_code
gpus, err := os.GetGPUInfo(sys)
gpus, err := systemServer.GetGPUInfo(sys)
if err != nil {
log.Fatalln(err)
panic("获取当前主机GPU数据失败程序已退出请检查GPU驱动等程序后重新启动本客户端。")
}
client.GPUs = gpus
client.serverURL = url
client.stopHeartbeat = make(chan struct{})
client.lastPong = time.Now()
// 连接服务端
server_conn, err := net.Dial("tcp", url)
if err != nil {
log.Fatalf("客户端连接到服务器失败:%v", err)
return nil
}
defer server_conn.Close()
client.ServerConn = server_conn
return client
}
// Stop 停止客户端(包括持续挖矿)
func (c *Client) Stop() {
// 停止持续挖矿
if c.sustainMiner != nil {
c.sustainMiner.Stop()
}
// 关闭连接
c.mu.Lock()
if c.ServerConn != nil {
c.ServerConn.Close()
c.ServerConn = nil
}
close(c.stopHeartbeat)
c.mu.Unlock()
}
func (c *Client) sendMachineCode() {
var msg message.ServerMsg
msg.ID = c.Auth + "." + c.MachineCode
@@ -99,17 +183,44 @@ func (c *Client) sendMachineCode() {
}
func (c *Client) receiveMsg() {
defer func() {
c.mu.Lock()
if c.ServerConn != nil {
c.ServerConn.Close()
c.ServerConn = nil
}
c.mu.Unlock()
}()
buffer := make([]byte, 1024)
for {
n, err := c.ServerConn.Read(buffer)
c.mu.Lock()
conn := c.ServerConn
c.mu.Unlock()
if conn == nil {
log.Println("连接已断开,退出接收循环")
return
}
// 设置读取超时,用于检测连接是否存活
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
n, err := conn.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Println("读取超时,连接可能已断开")
c.reconnect()
return
}
if err.Error() == "EOF" {
// 服务端关闭连接时,退出接收循环
log.Println("服务端关闭了连接")
c.reconnect()
return
}
log.Println("接收数据失败:", err)
log.Printf("接收数据失败: %v", err)
c.reconnect()
return
}
@@ -118,12 +229,20 @@ func (c *Client) receiveMsg() {
}
}
func (c *Client) send(msg []byte) {
func (c *Client) send(msg []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.ServerConn == nil {
return fmt.Errorf("连接已断开")
}
_, err := c.ServerConn.Write(msg)
if err != nil {
log.Fatalf("发送消息失败消息内容:%s", string(msg))
return
log.Printf("发送消息失败%v\n消息内容:%s", err, string(msg))
return err
}
return nil
}
func (c *Client) handleReceiveMsg(msg []byte) {
@@ -144,64 +263,239 @@ func (c *Client) handleReceiveMsg(msg []byte) {
return
}
switch data.Method {
case "pong":
// 收到心跳响应更新最后pong时间
c.mu.Lock()
c.lastPong = time.Now()
c.mu.Unlock()
log.Println("收到心跳响应")
return
case "mining.req":
mining_msg, ok := data.Params.(message.ConfigurationMiningMsg)
if ok {
// 这里开始挖矿
err := c.os.Mining(c.osName, mining_msg)
if err != nil {
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: false,
Data: err,
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
break
}
c.send(sendMsg_byte) // 返回失败消息
}
// 挖矿开始
data := message.ConfigurationMiningResp{
Coin: mining_msg.Coin,
Algo: mining_msg.Algo,
Pool: mining_msg.Pool,
PoolUrl: mining_msg.PoolUrl,
WorkerID: mining_msg.WorkerID,
WalletAddress: mining_msg.WalletAddress,
WatchUrl: "", // 这里需要根据矿池自动生成
}
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: true,
Data: data,
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
break
}
c.send(sendMsg_byte) // 返回成功消息
} else {
// 将 data.Params 重新序列化为 JSON然后反序列化为 ConfigurationMiningMsg
// 因为 data.Params 是 any 类型JSON 反序列化后是 map[string]interface{}
paramsJSON, err := json.Marshal(data.Params)
if err != nil {
log.Printf("序列化 Params 失败:%v", err)
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: false,
Data: fmt.Errorf("错误的params数据结构%v", mining_msg),
Data: fmt.Errorf("序列化 Params 失败%v", err),
}
sendMsg_byte, _ := json.Marshal(sendMsg_str)
c.send(sendMsg_byte)
return
}
var mining_msg message.ConfigurationMiningMsg
err = json.Unmarshal(paramsJSON, &mining_msg)
if err != nil {
log.Printf("解析挖矿配置消息失败:%v, Params: %s", err, string(paramsJSON))
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: false,
Data: fmt.Errorf("解析挖矿配置消息失败:%v", err),
}
sendMsg_byte, _ := json.Marshal(sendMsg_str)
c.send(sendMsg_byte)
return
}
// 暂停持续挖矿(如果有新任务)
if c.sustainMiner != nil && c.sustainMiner.IsRunning() {
c.sustainMiner.Pause()
}
// 这里开始挖矿
err = c.os.Mining(c.osName, mining_msg)
if err != nil {
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: false,
Data: err.Error(),
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
break
return
}
c.send(sendMsg_byte) // 返回失败消息
return
}
// 挖矿开始
respData := message.ConfigurationMiningResp{
Coin: mining_msg.Coin,
Algo: mining_msg.Algo,
Pool: mining_msg.Pool,
PoolUrl: mining_msg.PoolUrl,
WorkerID: mining_msg.WorkerID,
WalletAddress: mining_msg.WalletAddress,
WatchUrl: "", // 这里需要根据矿池自动生成
}
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: true,
Data: respData,
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
return
}
c.send(sendMsg_byte) // 返回成功消息
// 启动任务结束监控,任务结束后恢复持续挖矿
go c.monitorMiningTask(mining_msg)
case "mining.end":
c.os.StopMining(c.osName)
default:
log.Printf("未知的方法:%s", data.Method)
}
}
// monitorMiningTask 监控挖矿任务,任务结束后恢复持续挖矿
func (c *Client) monitorMiningTask(cfg message.ConfigurationMiningMsg) {
endTimestamp := int64(cfg.EndTimestamp)
currentTimestamp := time.Now().Unix()
// 如果任务已经结束,直接恢复持续挖矿
if endTimestamp <= currentTimestamp {
if c.sustainMiner != nil {
c.sustainMiner.Resume()
}
return
}
// 计算等待时间
waitDuration := time.Second * time.Duration(endTimestamp-currentTimestamp)
// 等待任务结束
time.Sleep(waitDuration)
log.Println("挖矿任务已结束,恢复持续挖矿")
// 恢复持续挖矿
if c.sustainMiner != nil {
c.sustainMiner.Resume()
}
}
// startHeartbeat 启动心跳检查
func (c *Client) startHeartbeat() {
ticker := time.NewTicker(30 * time.Second) // 每30秒发送一次心跳
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否超过60秒未收到pong响应
c.mu.Lock()
lastPong := c.lastPong
conn := c.ServerConn
c.mu.Unlock()
if time.Since(lastPong) > 60*time.Second {
log.Println("超过60秒未收到心跳响应连接可能已断开")
c.reconnect()
return
}
// 发送心跳
if conn != nil {
pingMsg := message.ServerMsg{
ID: c.Auth + "." + c.MachineCode,
Method: "ping",
Params: nil,
}
msgByte, err := json.Marshal(pingMsg)
if err != nil {
log.Printf("序列化心跳消息失败:%v", err)
continue
}
if err := c.send(msgByte); err != nil {
log.Printf("发送心跳失败:%v", err)
c.reconnect()
return
}
log.Println("发送心跳")
}
case <-c.stopHeartbeat:
return
}
}
}
func Star() {
url := "xxxx"
client := newClient(url)
client.sendMachineCode()
go client.receiveMsg() // 开始接收服务端消息
// reconnect 重连服务器
func (c *Client) reconnect() {
c.mu.Lock()
if c.ServerConn != nil {
c.ServerConn.Close()
c.ServerConn = nil
}
close(c.stopHeartbeat)
c.stopHeartbeat = make(chan struct{})
serverURL := c.serverURL
c.mu.Unlock()
log.Println("尝试重新连接服务器...")
// 重连逻辑
for {
time.Sleep(5 * time.Second) // 等待5秒后重连
conn, err := net.Dial("tcp", serverURL)
if err != nil {
log.Printf("重连失败:%v5秒后重试...", err)
continue
}
log.Println("重连成功")
c.mu.Lock()
c.ServerConn = conn
c.lastPong = time.Now()
c.mu.Unlock()
// 重新发送机器码
c.sendMachineCode()
// 重新启动心跳和接收消息
go c.startHeartbeat()
go c.receiveMsg()
return
}
}
func Star(url string) {
// url := "10.168.2.249:8080"
// url := "47.108.221.51:23456"
client := newClient(url)
globalClient = client
client.sendMachineCode()
// 初始化并启动持续挖矿(如果配置启用)
if client.sustainMiner != nil {
err := client.sustainMiner.LoadConfig()
if err != nil {
log.Printf("加载持续挖矿配置失败:%v", err)
} else {
err = client.sustainMiner.Start()
if err != nil {
log.Printf("启动持续挖矿失败:%v", err)
}
}
}
// 启动心跳检查
go client.startHeartbeat()
// 开始接收服务端消息
client.receiveMsg()
}
var globalClient *Client
// StopClient 停止客户端(供外部调用)
func StopClient() {
if globalClient != nil {
globalClient.Stop()
}
}