Files
mining-client/internal/client.go
2025-12-01 15:45:05 +08:00

502 lines
13 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
本程序为云算力平台卖方客户端,主要提供以下功能:
1将云算力平台上的卖方身份和GPU主机绑定
2通过本客户端可以使云算力平台获取到卖方具体每台机器的详细参数包括GPU型号、显存容量等
3卖方可通过本客户端自动匹配买方的挖矿需求即卖方无需再买方下单后手动操作挖矿
卖家在启动客户端之前需要注意以下事项:
1确定客户端执行主机已经配置好挖矿环境包括显卡驱动、挖矿软件(指定挖矿软件)、执行权限等,即执行本客户端的用户可以手动通过挖矿软件进行挖矿
2如果要对本机GPU进行移除(拔出GPU)操作云算力平台会同步移除对应的GPU
3如果在相关GPU有租约且没有在平台申请故障处理的情况下直接移除(拔出)GPU会导致产生罚没因此在有租约的情况下要移除故障GPU请第一时间前往平台申请故障处理在平台确认后再进行移除GPU的操作
4如果要对本机GPU进行更换(拔出后又新插入GPU)操作云算力平台会重新读取GPU数据如果更换型号相同则会按原有配置上架如果更换的型号不同则需在更换后前往卖家中心手动调整上架配置
5如果在相关GPU有租约且没有在平台申请故障处理的情况下直接更换GPU可能会导致产生罚没因此在有租约的情况下要更换故障GPU请第一时间前往平台申请故障处理在平台确认后再进行更换GPU的操作
*/
package client
import (
message "client/internal/msg"
"client/internal/src"
"client/internal/src/linux"
"client/internal/src/windows"
"client/internal/sustain"
"client/internal/utils"
"encoding/json"
"fmt"
"log"
"net"
"runtime"
"strings"
"sync"
"time"
"os"
"gopkg.in/ini.v1"
)
type Client struct {
Auth string
MachineCode string
ServerConn net.Conn // 服务连接
GPUs map[int]message.GPU // {"gpu编号": message.GPU{}, ...}
os *src.SystemServer
osName string
serverURL string // 服务器地址
mu sync.Mutex // 保护连接操作的互斥锁
lastPong time.Time // 最后一次收到pong的时间
stopHeartbeat chan struct{} // 停止心跳的通道
sustainMiner *sustain.SustainMiner // 持续挖矿管理器
}
// osExit 用于在权限检查失败时退出程序,便于测试时覆盖
var osExit = func(code int) {
os.Exit(code)
}
func newClient(url string) *Client {
var client = &Client{}
// 读取身份文件
auth, err := utils.ReadFile("./auth")
if err != nil {
log.Fatalf("获取客户端身份失败:%v", err)
return nil
}
client.Auth = auth
systemServer := src.NewSystemServer()
sys := runtime.GOOS
log.Printf("主机操作系统:%s", sys)
// 启动前进行权限自检(不同系统分别检查)
switch sys {
case "linux":
if err := linux.CheckPermission(); err != nil {
log.Println(err.Error())
log.Println("权限不足,客户端已退出。")
osExit(1)
}
linux_ := linux.NewLinuxClient(auth)
systemServer.ResiterSystem("linux", linux_)
case "windows":
if err := windows.CheckPermission(); err != nil {
log.Println(err.Error())
log.Println("权限不足,客户端已退出。")
osExit(1)
}
windows_ := windows.NewWindowsClient(auth)
systemServer.ResiterSystem("windows", windows_)
default:
log.Printf("不支持的操作系统:%s客户端已退出。", sys)
osExit(1)
}
client.os = systemServer
client.osName = sys
// 初始化持续挖矿管理器(从配置文件读取挖矿软件路径)
var miningConfig message.MiningConfig
var confFile string
if sys == "windows" {
confFile = "mining.windows.conf"
} else {
confFile = "mining.linux.conf"
}
// 读取挖矿配置
cfg, err := ini.Load(confFile)
if err == nil {
sectionBzMiner := cfg.Section("bzminer")
miningConfig.BzMinerPath = sectionBzMiner.Key("path").String()
sectionLolMiner := cfg.Section("lolminer")
miningConfig.LolMinerPath = sectionLolMiner.Key("path").String()
sectionRigel := cfg.Section("rigel")
miningConfig.RigelPath = sectionRigel.Key("path").String()
sectionProxy := cfg.Section("proxy")
miningConfig.ProxyEnabled, _ = sectionProxy.Key("proxy").Bool()
}
client.sustainMiner = sustain.NewSustainMiner(systemServer, sys, miningConfig)
// 读取主机MAC地址信息
var machine_code string
machine_code, err = systemServer.GetMACAddress(sys)
if err != nil {
log.Fatalln(err)
panic("获取当前主机信息失败,程序已退出,请检查网络后重新启动本客户端。")
}
// utils.WirteFile("./machinecode", machine_code)
client.MachineCode = machine_code
gpus, err := systemServer.GetGPUInfo(sys)
if err != nil {
log.Fatalln(err)
panic("获取当前主机GPU数据失败程序已退出请检查GPU驱动等程序后重新启动本客户端。")
}
client.GPUs = gpus
client.serverURL = url
client.stopHeartbeat = make(chan struct{})
client.lastPong = time.Now()
// 连接服务端
server_conn, err := net.Dial("tcp", url)
if err != nil {
log.Fatalf("客户端连接到服务器失败:%v", err)
return nil
}
client.ServerConn = server_conn
return client
}
// Stop 停止客户端(包括持续挖矿)
func (c *Client) Stop() {
// 停止持续挖矿
if c.sustainMiner != nil {
c.sustainMiner.Stop()
}
// 关闭连接
c.mu.Lock()
if c.ServerConn != nil {
c.ServerConn.Close()
c.ServerConn = nil
}
close(c.stopHeartbeat)
c.mu.Unlock()
}
func (c *Client) sendMachineCode() {
var msg message.ServerMsg
msg.ID = c.Auth + "." + c.MachineCode
msg.Method = "auth.machineCode"
msg.Params = c.GPUs
msgByte, err := json.Marshal(msg)
if err != nil {
log.Fatalf("消息(%v)序列化失败:%v", msg, err)
return
}
c.send(msgByte)
}
func (c *Client) receiveMsg() {
defer func() {
c.mu.Lock()
if c.ServerConn != nil {
c.ServerConn.Close()
c.ServerConn = nil
}
c.mu.Unlock()
}()
buffer := make([]byte, 1024)
for {
c.mu.Lock()
conn := c.ServerConn
c.mu.Unlock()
if conn == nil {
log.Println("连接已断开,退出接收循环")
return
}
// 设置读取超时,用于检测连接是否存活
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
n, err := conn.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Println("读取超时,连接可能已断开")
c.reconnect()
return
}
if err.Error() == "EOF" {
// 服务端关闭连接时,退出接收循环
log.Println("服务端关闭了连接")
c.reconnect()
return
}
log.Printf("接收数据失败: %v", err)
c.reconnect()
return
}
msgByte := buffer[:n]
go c.handleReceiveMsg(msgByte)
}
}
func (c *Client) send(msg []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.ServerConn == nil {
return fmt.Errorf("连接已断开")
}
_, err := c.ServerConn.Write(msg)
if err != nil {
log.Printf("发送消息失败:%v\n消息内容%s", err, string(msg))
return err
}
return nil
}
func (c *Client) handleReceiveMsg(msg []byte) {
var data message.ServerMsg
err := json.Unmarshal(msg, &data)
if err != nil {
log.Fatalf("解析接收到的消息失败:%v", err)
return
}
parts := strings.Split(data.ID, ".")
if len(parts) != 2 {
log.Fatalf("解析通信协议(server->client)失败")
return
}
auth, machine_code := parts[0], parts[1]
if c.Auth != auth || c.MachineCode != machine_code {
log.Fatalf("客户端接收到错误的服务端消息")
return
}
switch data.Method {
case "pong":
// 收到心跳响应更新最后pong时间
c.mu.Lock()
c.lastPong = time.Now()
c.mu.Unlock()
log.Println("收到心跳响应")
return
case "mining.req":
// 将 data.Params 重新序列化为 JSON然后反序列化为 ConfigurationMiningMsg
// 因为 data.Params 是 any 类型JSON 反序列化后是 map[string]interface{}
paramsJSON, err := json.Marshal(data.Params)
if err != nil {
log.Printf("序列化 Params 失败:%v", err)
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: false,
Data: fmt.Errorf("序列化 Params 失败:%v", err),
}
sendMsg_byte, _ := json.Marshal(sendMsg_str)
c.send(sendMsg_byte)
return
}
var mining_msg message.ConfigurationMiningMsg
err = json.Unmarshal(paramsJSON, &mining_msg)
if err != nil {
log.Printf("解析挖矿配置消息失败:%v, Params: %s", err, string(paramsJSON))
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: false,
Data: fmt.Errorf("解析挖矿配置消息失败:%v", err),
}
sendMsg_byte, _ := json.Marshal(sendMsg_str)
c.send(sendMsg_byte)
return
}
// 暂停持续挖矿(如果有新任务)
if c.sustainMiner != nil && c.sustainMiner.IsRunning() {
c.sustainMiner.Pause()
}
// 这里开始挖矿
err = c.os.Mining(c.osName, mining_msg)
if err != nil {
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: false,
Data: err.Error(),
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
return
}
c.send(sendMsg_byte) // 返回失败消息
return
}
// 挖矿开始
respData := message.ConfigurationMiningResp{
Coin: mining_msg.Coin,
Algo: mining_msg.Algo,
Pool: mining_msg.Pool,
PoolUrl: mining_msg.PoolUrl,
WorkerID: mining_msg.WorkerID,
WalletAddress: mining_msg.WalletAddress,
WatchUrl: "", // 这里需要根据矿池自动生成
}
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
Result: true,
Data: respData,
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
return
}
c.send(sendMsg_byte) // 返回成功消息
// 启动任务结束监控,任务结束后恢复持续挖矿
go c.monitorMiningTask(mining_msg)
case "mining.end":
c.os.StopMining(c.osName)
default:
log.Printf("未知的方法:%s", data.Method)
}
}
// monitorMiningTask 监控挖矿任务,任务结束后恢复持续挖矿
func (c *Client) monitorMiningTask(cfg message.ConfigurationMiningMsg) {
endTimestamp := int64(cfg.EndTimestamp)
currentTimestamp := time.Now().Unix()
// 如果任务已经结束,直接恢复持续挖矿
if endTimestamp <= currentTimestamp {
if c.sustainMiner != nil {
c.sustainMiner.Resume()
}
return
}
// 计算等待时间
waitDuration := time.Second * time.Duration(endTimestamp-currentTimestamp)
// 等待任务结束
time.Sleep(waitDuration)
log.Println("挖矿任务已结束,恢复持续挖矿")
// 恢复持续挖矿
if c.sustainMiner != nil {
c.sustainMiner.Resume()
}
}
// startHeartbeat 启动心跳检查
func (c *Client) startHeartbeat() {
ticker := time.NewTicker(30 * time.Second) // 每30秒发送一次心跳
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否超过60秒未收到pong响应
c.mu.Lock()
lastPong := c.lastPong
conn := c.ServerConn
c.mu.Unlock()
if time.Since(lastPong) > 60*time.Second {
log.Println("超过60秒未收到心跳响应连接可能已断开")
c.reconnect()
return
}
// 发送心跳
if conn != nil {
pingMsg := message.ServerMsg{
ID: c.Auth + "." + c.MachineCode,
Method: "ping",
Params: nil,
}
msgByte, err := json.Marshal(pingMsg)
if err != nil {
log.Printf("序列化心跳消息失败:%v", err)
continue
}
if err := c.send(msgByte); err != nil {
log.Printf("发送心跳失败:%v", err)
c.reconnect()
return
}
log.Println("发送心跳")
}
case <-c.stopHeartbeat:
return
}
}
}
// reconnect 重连服务器
func (c *Client) reconnect() {
c.mu.Lock()
if c.ServerConn != nil {
c.ServerConn.Close()
c.ServerConn = nil
}
close(c.stopHeartbeat)
c.stopHeartbeat = make(chan struct{})
serverURL := c.serverURL
c.mu.Unlock()
log.Println("尝试重新连接服务器...")
// 重连逻辑
for {
time.Sleep(5 * time.Second) // 等待5秒后重连
conn, err := net.Dial("tcp", serverURL)
if err != nil {
log.Printf("重连失败:%v5秒后重试...", err)
continue
}
log.Println("重连成功")
c.mu.Lock()
c.ServerConn = conn
c.lastPong = time.Now()
c.mu.Unlock()
// 重新发送机器码
c.sendMachineCode()
// 重新启动心跳和接收消息
go c.startHeartbeat()
go c.receiveMsg()
return
}
}
func Star(url string) {
// url := "10.168.2.249:8080"
// url := "47.108.221.51:23456"
client := newClient(url)
globalClient = client
client.sendMachineCode()
// 初始化并启动持续挖矿(如果配置启用)
if client.sustainMiner != nil {
err := client.sustainMiner.LoadConfig()
if err != nil {
log.Printf("加载持续挖矿配置失败:%v", err)
} else {
err = client.sustainMiner.Start()
if err != nil {
log.Printf("启动持续挖矿失败:%v", err)
}
}
}
// 启动心跳检查
go client.startHeartbeat()
// 开始接收服务端消息
client.receiveMsg()
}
var globalClient *Client
// StopClient 停止客户端(供外部调用)
func StopClient() {
if globalClient != nil {
globalClient.Stop()
}
}