502 lines
13 KiB
Go
502 lines
13 KiB
Go
/*
|
||
本程序为云算力平台卖方客户端,主要提供以下功能:
|
||
1,将云算力平台上的卖方身份和GPU主机绑定
|
||
2,通过本客户端,可以使云算力平台获取到卖方具体每台机器的详细参数,包括GPU型号、显存容量等
|
||
3,卖方可通过本客户端自动匹配买方的挖矿需求,即卖方无需再买方下单后手动操作挖矿
|
||
|
||
卖家在启动客户端之前需要注意以下事项:
|
||
1,确定客户端执行主机已经配置好挖矿环境,包括显卡驱动、挖矿软件(指定挖矿软件)、执行权限等,即执行本客户端的用户可以手动通过挖矿软件进行挖矿
|
||
2,如果要对本机GPU进行移除(拔出GPU)操作,云算力平台会同步移除对应的GPU
|
||
3,如果在相关GPU有租约且没有在平台申请故障处理的情况下直接移除(拔出)GPU,会导致产生罚没,因此在有租约的情况下要移除故障GPU,请第一时间前往平台申请故障处理,在平台确认后再进行移除GPU的操作
|
||
4,如果要对本机GPU进行更换(拔出后又新插入GPU)操作,云算力平台会重新读取GPU数据,如果更换型号相同,则会按原有配置上架,如果更换的型号不同,则需在更换后前往卖家中心手动调整上架配置
|
||
5,如果在相关GPU有租约且没有在平台申请故障处理的情况下直接更换GPU,可能会导致产生罚没,因此在有租约的情况下要更换故障GPU,请第一时间前往平台申请故障处理,在平台确认后再进行更换GPU的操作
|
||
*/
|
||
package client
|
||
|
||
import (
|
||
message "client/internal/msg"
|
||
"client/internal/src"
|
||
"client/internal/src/linux"
|
||
"client/internal/src/windows"
|
||
"client/internal/sustain"
|
||
"client/internal/utils"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"net"
|
||
"runtime"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"os"
|
||
|
||
"gopkg.in/ini.v1"
|
||
)
|
||
|
||
type Client struct {
|
||
Auth string
|
||
MachineCode string
|
||
ServerConn net.Conn // 服务连接
|
||
GPUs map[int]message.GPU // {"gpu编号": message.GPU{}, ...}
|
||
os *src.SystemServer
|
||
osName string
|
||
serverURL string // 服务器地址
|
||
mu sync.Mutex // 保护连接操作的互斥锁
|
||
lastPong time.Time // 最后一次收到pong的时间
|
||
stopHeartbeat chan struct{} // 停止心跳的通道
|
||
sustainMiner *sustain.SustainMiner // 持续挖矿管理器
|
||
}
|
||
|
||
// osExit 用于在权限检查失败时退出程序,便于测试时覆盖
|
||
var osExit = func(code int) {
|
||
os.Exit(code)
|
||
}
|
||
|
||
func newClient(url string) *Client {
|
||
var client = &Client{}
|
||
// 读取身份文件
|
||
auth, err := utils.ReadFile("./auth")
|
||
if err != nil {
|
||
log.Fatalf("获取客户端身份失败:%v", err)
|
||
return nil
|
||
}
|
||
|
||
client.Auth = auth
|
||
|
||
systemServer := src.NewSystemServer()
|
||
sys := runtime.GOOS
|
||
log.Printf("主机操作系统:%s", sys)
|
||
|
||
// 启动前进行权限自检(不同系统分别检查)
|
||
switch sys {
|
||
case "linux":
|
||
if err := linux.CheckPermission(); err != nil {
|
||
log.Println(err.Error())
|
||
log.Println("权限不足,客户端已退出。")
|
||
osExit(1)
|
||
}
|
||
linux_ := linux.NewLinuxClient(auth)
|
||
systemServer.ResiterSystem("linux", linux_)
|
||
case "windows":
|
||
if err := windows.CheckPermission(); err != nil {
|
||
log.Println(err.Error())
|
||
log.Println("权限不足,客户端已退出。")
|
||
osExit(1)
|
||
}
|
||
windows_ := windows.NewWindowsClient(auth)
|
||
systemServer.ResiterSystem("windows", windows_)
|
||
default:
|
||
log.Printf("不支持的操作系统:%s,客户端已退出。", sys)
|
||
osExit(1)
|
||
}
|
||
client.os = systemServer
|
||
client.osName = sys
|
||
|
||
// 初始化持续挖矿管理器(从配置文件读取挖矿软件路径)
|
||
var miningConfig message.MiningConfig
|
||
var confFile string
|
||
if sys == "windows" {
|
||
confFile = "mining.windows.conf"
|
||
} else {
|
||
confFile = "mining.linux.conf"
|
||
}
|
||
|
||
// 读取挖矿配置
|
||
cfg, err := ini.Load(confFile)
|
||
if err == nil {
|
||
sectionBzMiner := cfg.Section("bzminer")
|
||
miningConfig.BzMinerPath = sectionBzMiner.Key("path").String()
|
||
|
||
sectionLolMiner := cfg.Section("lolminer")
|
||
miningConfig.LolMinerPath = sectionLolMiner.Key("path").String()
|
||
|
||
sectionRigel := cfg.Section("rigel")
|
||
miningConfig.RigelPath = sectionRigel.Key("path").String()
|
||
|
||
sectionProxy := cfg.Section("proxy")
|
||
miningConfig.ProxyEnabled, _ = sectionProxy.Key("proxy").Bool()
|
||
}
|
||
|
||
client.sustainMiner = sustain.NewSustainMiner(systemServer, sys, miningConfig)
|
||
|
||
// 读取主机MAC地址信息
|
||
var machine_code string
|
||
machine_code, err = systemServer.GetMACAddress(sys)
|
||
if err != nil {
|
||
log.Fatalln(err)
|
||
panic("获取当前主机信息失败,程序已退出,请检查网络后重新启动本客户端。")
|
||
}
|
||
// utils.WirteFile("./machinecode", machine_code)
|
||
|
||
client.MachineCode = machine_code
|
||
|
||
gpus, err := systemServer.GetGPUInfo(sys)
|
||
if err != nil {
|
||
log.Fatalln(err)
|
||
panic("获取当前主机GPU数据失败,程序已退出,请检查GPU驱动等程序后重新启动本客户端。")
|
||
}
|
||
client.GPUs = gpus
|
||
client.serverURL = url
|
||
client.stopHeartbeat = make(chan struct{})
|
||
client.lastPong = time.Now()
|
||
|
||
// 连接服务端
|
||
server_conn, err := net.Dial("tcp", url)
|
||
if err != nil {
|
||
log.Fatalf("客户端连接到服务器失败:%v", err)
|
||
return nil
|
||
}
|
||
|
||
client.ServerConn = server_conn
|
||
return client
|
||
}
|
||
|
||
// Stop 停止客户端(包括持续挖矿)
|
||
func (c *Client) Stop() {
|
||
// 停止持续挖矿
|
||
if c.sustainMiner != nil {
|
||
c.sustainMiner.Stop()
|
||
}
|
||
|
||
// 关闭连接
|
||
c.mu.Lock()
|
||
if c.ServerConn != nil {
|
||
c.ServerConn.Close()
|
||
c.ServerConn = nil
|
||
}
|
||
close(c.stopHeartbeat)
|
||
c.mu.Unlock()
|
||
}
|
||
|
||
func (c *Client) sendMachineCode() {
|
||
var msg message.ServerMsg
|
||
msg.ID = c.Auth + "." + c.MachineCode
|
||
msg.Method = "auth.machineCode"
|
||
msg.Params = c.GPUs
|
||
msgByte, err := json.Marshal(msg)
|
||
if err != nil {
|
||
log.Fatalf("消息(%v)序列化失败:%v", msg, err)
|
||
return
|
||
}
|
||
c.send(msgByte)
|
||
}
|
||
|
||
func (c *Client) receiveMsg() {
|
||
defer func() {
|
||
c.mu.Lock()
|
||
if c.ServerConn != nil {
|
||
c.ServerConn.Close()
|
||
c.ServerConn = nil
|
||
}
|
||
c.mu.Unlock()
|
||
}()
|
||
|
||
buffer := make([]byte, 1024)
|
||
|
||
for {
|
||
c.mu.Lock()
|
||
conn := c.ServerConn
|
||
c.mu.Unlock()
|
||
|
||
if conn == nil {
|
||
log.Println("连接已断开,退出接收循环")
|
||
return
|
||
}
|
||
|
||
// 设置读取超时,用于检测连接是否存活
|
||
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||
n, err := conn.Read(buffer)
|
||
if err != nil {
|
||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||
log.Println("读取超时,连接可能已断开")
|
||
c.reconnect()
|
||
return
|
||
}
|
||
if err.Error() == "EOF" {
|
||
// 服务端关闭连接时,退出接收循环
|
||
log.Println("服务端关闭了连接")
|
||
c.reconnect()
|
||
return
|
||
}
|
||
log.Printf("接收数据失败: %v", err)
|
||
c.reconnect()
|
||
return
|
||
}
|
||
|
||
msgByte := buffer[:n]
|
||
go c.handleReceiveMsg(msgByte)
|
||
}
|
||
}
|
||
|
||
func (c *Client) send(msg []byte) error {
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
|
||
if c.ServerConn == nil {
|
||
return fmt.Errorf("连接已断开")
|
||
}
|
||
|
||
_, err := c.ServerConn.Write(msg)
|
||
if err != nil {
|
||
log.Printf("发送消息失败:%v\n消息内容:%s", err, string(msg))
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (c *Client) handleReceiveMsg(msg []byte) {
|
||
var data message.ServerMsg
|
||
err := json.Unmarshal(msg, &data)
|
||
if err != nil {
|
||
log.Fatalf("解析接收到的消息失败:%v", err)
|
||
return
|
||
}
|
||
parts := strings.Split(data.ID, ".")
|
||
if len(parts) != 2 {
|
||
log.Fatalf("解析通信协议(server->client)失败")
|
||
return
|
||
}
|
||
auth, machine_code := parts[0], parts[1]
|
||
if c.Auth != auth || c.MachineCode != machine_code {
|
||
log.Fatalf("客户端接收到错误的服务端消息")
|
||
return
|
||
}
|
||
switch data.Method {
|
||
case "pong":
|
||
// 收到心跳响应,更新最后pong时间
|
||
c.mu.Lock()
|
||
c.lastPong = time.Now()
|
||
c.mu.Unlock()
|
||
log.Println("收到心跳响应")
|
||
return
|
||
case "mining.req":
|
||
// 将 data.Params 重新序列化为 JSON,然后反序列化为 ConfigurationMiningMsg
|
||
// 因为 data.Params 是 any 类型,JSON 反序列化后是 map[string]interface{}
|
||
paramsJSON, err := json.Marshal(data.Params)
|
||
if err != nil {
|
||
log.Printf("序列化 Params 失败:%v", err)
|
||
sendMsg_str := message.ServerMsgResp{
|
||
ID: c.Auth + "." + c.MachineCode,
|
||
Result: false,
|
||
Data: fmt.Errorf("序列化 Params 失败:%v", err),
|
||
}
|
||
sendMsg_byte, _ := json.Marshal(sendMsg_str)
|
||
c.send(sendMsg_byte)
|
||
return
|
||
}
|
||
|
||
var mining_msg message.ConfigurationMiningMsg
|
||
err = json.Unmarshal(paramsJSON, &mining_msg)
|
||
if err != nil {
|
||
log.Printf("解析挖矿配置消息失败:%v, Params: %s", err, string(paramsJSON))
|
||
sendMsg_str := message.ServerMsgResp{
|
||
ID: c.Auth + "." + c.MachineCode,
|
||
Result: false,
|
||
Data: fmt.Errorf("解析挖矿配置消息失败:%v", err),
|
||
}
|
||
sendMsg_byte, _ := json.Marshal(sendMsg_str)
|
||
c.send(sendMsg_byte)
|
||
return
|
||
}
|
||
|
||
// 暂停持续挖矿(如果有新任务)
|
||
if c.sustainMiner != nil && c.sustainMiner.IsRunning() {
|
||
c.sustainMiner.Pause()
|
||
}
|
||
|
||
// 这里开始挖矿
|
||
err = c.os.Mining(c.osName, mining_msg)
|
||
if err != nil {
|
||
sendMsg_str := message.ServerMsgResp{
|
||
ID: c.Auth + "." + c.MachineCode,
|
||
Result: false,
|
||
Data: err.Error(),
|
||
}
|
||
sendMsg_byte, err := json.Marshal(sendMsg_str)
|
||
if err != nil {
|
||
log.Fatalf("序列化%v失败:%v", sendMsg_str, err)
|
||
return
|
||
}
|
||
c.send(sendMsg_byte) // 返回失败消息
|
||
return
|
||
}
|
||
// 挖矿开始
|
||
respData := message.ConfigurationMiningResp{
|
||
Coin: mining_msg.Coin,
|
||
Algo: mining_msg.Algo,
|
||
Pool: mining_msg.Pool,
|
||
PoolUrl: mining_msg.PoolUrl,
|
||
WorkerID: mining_msg.WorkerID,
|
||
WalletAddress: mining_msg.WalletAddress,
|
||
WatchUrl: "", // 这里需要根据矿池自动生成
|
||
}
|
||
sendMsg_str := message.ServerMsgResp{
|
||
ID: c.Auth + "." + c.MachineCode,
|
||
Result: true,
|
||
Data: respData,
|
||
}
|
||
sendMsg_byte, err := json.Marshal(sendMsg_str)
|
||
if err != nil {
|
||
log.Fatalf("序列化%v失败:%v", sendMsg_str, err)
|
||
return
|
||
}
|
||
c.send(sendMsg_byte) // 返回成功消息
|
||
|
||
// 启动任务结束监控,任务结束后恢复持续挖矿
|
||
go c.monitorMiningTask(mining_msg)
|
||
case "mining.end":
|
||
c.os.StopMining(c.osName)
|
||
default:
|
||
log.Printf("未知的方法:%s", data.Method)
|
||
}
|
||
}
|
||
|
||
// monitorMiningTask 监控挖矿任务,任务结束后恢复持续挖矿
|
||
func (c *Client) monitorMiningTask(cfg message.ConfigurationMiningMsg) {
|
||
endTimestamp := int64(cfg.EndTimestamp)
|
||
currentTimestamp := time.Now().Unix()
|
||
|
||
// 如果任务已经结束,直接恢复持续挖矿
|
||
if endTimestamp <= currentTimestamp {
|
||
if c.sustainMiner != nil {
|
||
c.sustainMiner.Resume()
|
||
}
|
||
return
|
||
}
|
||
|
||
// 计算等待时间
|
||
waitDuration := time.Second * time.Duration(endTimestamp-currentTimestamp)
|
||
|
||
// 等待任务结束
|
||
time.Sleep(waitDuration)
|
||
|
||
log.Println("挖矿任务已结束,恢复持续挖矿")
|
||
|
||
// 恢复持续挖矿
|
||
if c.sustainMiner != nil {
|
||
c.sustainMiner.Resume()
|
||
}
|
||
}
|
||
|
||
// startHeartbeat 启动心跳检查
|
||
func (c *Client) startHeartbeat() {
|
||
ticker := time.NewTicker(30 * time.Second) // 每30秒发送一次心跳
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
// 检查是否超过60秒未收到pong响应
|
||
c.mu.Lock()
|
||
lastPong := c.lastPong
|
||
conn := c.ServerConn
|
||
c.mu.Unlock()
|
||
|
||
if time.Since(lastPong) > 60*time.Second {
|
||
log.Println("超过60秒未收到心跳响应,连接可能已断开")
|
||
c.reconnect()
|
||
return
|
||
}
|
||
|
||
// 发送心跳
|
||
if conn != nil {
|
||
pingMsg := message.ServerMsg{
|
||
ID: c.Auth + "." + c.MachineCode,
|
||
Method: "ping",
|
||
Params: nil,
|
||
}
|
||
msgByte, err := json.Marshal(pingMsg)
|
||
if err != nil {
|
||
log.Printf("序列化心跳消息失败:%v", err)
|
||
continue
|
||
}
|
||
if err := c.send(msgByte); err != nil {
|
||
log.Printf("发送心跳失败:%v", err)
|
||
c.reconnect()
|
||
return
|
||
}
|
||
log.Println("发送心跳")
|
||
}
|
||
case <-c.stopHeartbeat:
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// reconnect 重连服务器
|
||
func (c *Client) reconnect() {
|
||
c.mu.Lock()
|
||
if c.ServerConn != nil {
|
||
c.ServerConn.Close()
|
||
c.ServerConn = nil
|
||
}
|
||
close(c.stopHeartbeat)
|
||
c.stopHeartbeat = make(chan struct{})
|
||
serverURL := c.serverURL
|
||
c.mu.Unlock()
|
||
|
||
log.Println("尝试重新连接服务器...")
|
||
|
||
// 重连逻辑
|
||
for {
|
||
time.Sleep(5 * time.Second) // 等待5秒后重连
|
||
|
||
conn, err := net.Dial("tcp", serverURL)
|
||
if err != nil {
|
||
log.Printf("重连失败:%v,5秒后重试...", err)
|
||
continue
|
||
}
|
||
|
||
log.Println("重连成功")
|
||
c.mu.Lock()
|
||
c.ServerConn = conn
|
||
c.lastPong = time.Now()
|
||
c.mu.Unlock()
|
||
|
||
// 重新发送机器码
|
||
c.sendMachineCode()
|
||
|
||
// 重新启动心跳和接收消息
|
||
go c.startHeartbeat()
|
||
go c.receiveMsg()
|
||
return
|
||
}
|
||
}
|
||
|
||
func Star(url string) {
|
||
// url := "10.168.2.249:8080"
|
||
// url := "47.108.221.51:23456"
|
||
client := newClient(url)
|
||
globalClient = client
|
||
client.sendMachineCode()
|
||
|
||
// 初始化并启动持续挖矿(如果配置启用)
|
||
if client.sustainMiner != nil {
|
||
err := client.sustainMiner.LoadConfig()
|
||
if err != nil {
|
||
log.Printf("加载持续挖矿配置失败:%v", err)
|
||
} else {
|
||
err = client.sustainMiner.Start()
|
||
if err != nil {
|
||
log.Printf("启动持续挖矿失败:%v", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 启动心跳检查
|
||
go client.startHeartbeat()
|
||
|
||
// 开始接收服务端消息
|
||
client.receiveMsg()
|
||
}
|
||
|
||
var globalClient *Client
|
||
|
||
// StopClient 停止客户端(供外部调用)
|
||
func StopClient() {
|
||
if globalClient != nil {
|
||
globalClient.Stop()
|
||
}
|
||
}
|