/* 本程序为云算力平台卖方客户端,主要提供以下功能: 1,将云算力平台上的卖方身份和GPU主机绑定 2,通过本客户端,可以使云算力平台获取到卖方具体每台机器的详细参数,包括GPU型号、显存容量等 3,卖方可通过本客户端自动匹配买方的挖矿需求,即卖方无需再买方下单后手动操作挖矿 卖家在启动客户端之前需要注意以下事项: 1,确定客户端执行主机已经配置好挖矿环境,包括显卡驱动、挖矿软件(指定挖矿软件)、执行权限等,即执行本客户端的用户可以手动通过挖矿软件进行挖矿 2,如果要对本机GPU进行移除(拔出GPU)操作,云算力平台会同步移除对应的GPU 3,如果在相关GPU有租约且没有在平台申请故障处理的情况下直接移除(拔出)GPU,会导致产生罚没,因此在有租约的情况下要移除故障GPU,请第一时间前往平台申请故障处理,在平台确认后再进行移除GPU的操作 4,如果要对本机GPU进行更换(拔出后又新插入GPU)操作,云算力平台会重新读取GPU数据,如果更换型号相同,则会按原有配置上架,如果更换的型号不同,则需在更换后前往卖家中心手动调整上架配置 5,如果在相关GPU有租约且没有在平台申请故障处理的情况下直接更换GPU,可能会导致产生罚没,因此在有租约的情况下要更换故障GPU,请第一时间前往平台申请故障处理,在平台确认后再进行更换GPU的操作 */ package client import ( message "client/internal/msg" "client/internal/src" "client/internal/src/linux" "client/internal/src/windows" "client/internal/sustain" "client/internal/utils" "encoding/json" "fmt" "log" "net" "runtime" "strings" "sync" "time" "os" "gopkg.in/ini.v1" ) type Client struct { Auth string MachineCode string ServerConn net.Conn // 服务连接 GPUs map[int]message.GPU // {"gpu编号": message.GPU{}, ...} os *src.SystemServer osName string serverURL string // 服务器地址 mu sync.Mutex // 保护连接操作的互斥锁 lastPong time.Time // 最后一次收到pong的时间 stopHeartbeat chan struct{} // 停止心跳的通道 sustainMiner *sustain.SustainMiner // 持续挖矿管理器 } // osExit 用于在权限检查失败时退出程序,便于测试时覆盖 var osExit = func(code int) { os.Exit(code) } func newClient(url string) *Client { var client = &Client{} // 读取身份文件 auth, err := utils.ReadFile("./auth") if err != nil { log.Fatalf("获取客户端身份失败:%v", err) return nil } client.Auth = auth systemServer := src.NewSystemServer() sys := runtime.GOOS log.Printf("主机操作系统:%s", sys) // 启动前进行权限自检(不同系统分别检查) switch sys { case "linux": if err := linux.CheckPermission(); err != nil { log.Println(err.Error()) log.Println("权限不足,客户端已退出。") osExit(1) } linux_ := linux.NewLinuxClient(auth) systemServer.ResiterSystem("linux", linux_) case "windows": if err := windows.CheckPermission(); err != nil { log.Println(err.Error()) log.Println("权限不足,客户端已退出。") osExit(1) } windows_ := windows.NewWindowsClient(auth) systemServer.ResiterSystem("windows", windows_) default: log.Printf("不支持的操作系统:%s,客户端已退出。", sys) osExit(1) } client.os = systemServer client.osName = sys // 初始化持续挖矿管理器(从配置文件读取挖矿软件路径) var miningConfig message.MiningConfig var confFile string if sys == "windows" { confFile = "mining.windows.conf" } else { confFile = "mining.linux.conf" } // 读取挖矿配置 cfg, err := ini.Load(confFile) if err == nil { sectionBzMiner := cfg.Section("bzminer") miningConfig.BzMinerPath = sectionBzMiner.Key("path").String() sectionLolMiner := cfg.Section("lolminer") miningConfig.LolMinerPath = sectionLolMiner.Key("path").String() sectionRigel := cfg.Section("rigel") miningConfig.RigelPath = sectionRigel.Key("path").String() sectionProxy := cfg.Section("proxy") miningConfig.ProxyEnabled, _ = sectionProxy.Key("proxy").Bool() } client.sustainMiner = sustain.NewSustainMiner(systemServer, sys, miningConfig) // 读取主机MAC地址信息 var machine_code string machine_code, err = systemServer.GetMACAddress(sys) if err != nil { log.Fatalln(err) panic("获取当前主机信息失败,程序已退出,请检查网络后重新启动本客户端。") } // utils.WirteFile("./machinecode", machine_code) client.MachineCode = machine_code gpus, err := systemServer.GetGPUInfo(sys) if err != nil { log.Fatalln(err) panic("获取当前主机GPU数据失败,程序已退出,请检查GPU驱动等程序后重新启动本客户端。") } client.GPUs = gpus client.serverURL = url client.stopHeartbeat = make(chan struct{}) client.lastPong = time.Now() // 连接服务端 server_conn, err := net.Dial("tcp", url) if err != nil { log.Fatalf("客户端连接到服务器失败:%v", err) return nil } client.ServerConn = server_conn return client } // Stop 停止客户端(包括持续挖矿) func (c *Client) Stop() { // 停止持续挖矿 if c.sustainMiner != nil { c.sustainMiner.Stop() } // 关闭连接 c.mu.Lock() if c.ServerConn != nil { c.ServerConn.Close() c.ServerConn = nil } close(c.stopHeartbeat) c.mu.Unlock() } func (c *Client) sendMachineCode() { var msg message.ServerMsg msg.ID = c.Auth + "." + c.MachineCode msg.Method = "auth.machineCode" msg.Params = c.GPUs msgByte, err := json.Marshal(msg) if err != nil { log.Fatalf("消息(%v)序列化失败:%v", msg, err) return } c.send(msgByte) } func (c *Client) receiveMsg() { defer func() { c.mu.Lock() if c.ServerConn != nil { c.ServerConn.Close() c.ServerConn = nil } c.mu.Unlock() }() buffer := make([]byte, 1024) for { c.mu.Lock() conn := c.ServerConn c.mu.Unlock() if conn == nil { log.Println("连接已断开,退出接收循环") return } // 设置读取超时,用于检测连接是否存活 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) n, err := conn.Read(buffer) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { log.Println("读取超时,连接可能已断开") c.reconnect() return } if err.Error() == "EOF" { // 服务端关闭连接时,退出接收循环 log.Println("服务端关闭了连接") c.reconnect() return } log.Printf("接收数据失败: %v", err) c.reconnect() return } msgByte := buffer[:n] go c.handleReceiveMsg(msgByte) } } func (c *Client) send(msg []byte) error { c.mu.Lock() defer c.mu.Unlock() if c.ServerConn == nil { return fmt.Errorf("连接已断开") } _, err := c.ServerConn.Write(msg) if err != nil { log.Printf("发送消息失败:%v\n消息内容:%s", err, string(msg)) return err } return nil } func (c *Client) handleReceiveMsg(msg []byte) { var data message.ServerMsg err := json.Unmarshal(msg, &data) if err != nil { log.Fatalf("解析接收到的消息失败:%v", err) return } parts := strings.Split(data.ID, ".") if len(parts) != 2 { log.Fatalf("解析通信协议(server->client)失败") return } auth, machine_code := parts[0], parts[1] if c.Auth != auth || c.MachineCode != machine_code { log.Fatalf("客户端接收到错误的服务端消息") return } switch data.Method { case "pong": // 收到心跳响应,更新最后pong时间 c.mu.Lock() c.lastPong = time.Now() c.mu.Unlock() log.Println("收到心跳响应") return case "mining.req": // 将 data.Params 重新序列化为 JSON,然后反序列化为 ConfigurationMiningMsg // 因为 data.Params 是 any 类型,JSON 反序列化后是 map[string]interface{} paramsJSON, err := json.Marshal(data.Params) if err != nil { log.Printf("序列化 Params 失败:%v", err) sendMsg_str := message.ServerMsgResp{ ID: c.Auth + "." + c.MachineCode, Result: false, Data: fmt.Errorf("序列化 Params 失败:%v", err), } sendMsg_byte, _ := json.Marshal(sendMsg_str) c.send(sendMsg_byte) return } var mining_msg message.ConfigurationMiningMsg err = json.Unmarshal(paramsJSON, &mining_msg) if err != nil { log.Printf("解析挖矿配置消息失败:%v, Params: %s", err, string(paramsJSON)) sendMsg_str := message.ServerMsgResp{ ID: c.Auth + "." + c.MachineCode, Result: false, Data: fmt.Errorf("解析挖矿配置消息失败:%v", err), } sendMsg_byte, _ := json.Marshal(sendMsg_str) c.send(sendMsg_byte) return } // 暂停持续挖矿(如果有新任务) if c.sustainMiner != nil && c.sustainMiner.IsRunning() { c.sustainMiner.Pause() } // 这里开始挖矿 err = c.os.Mining(c.osName, mining_msg) if err != nil { sendMsg_str := message.ServerMsgResp{ ID: c.Auth + "." + c.MachineCode, Result: false, Data: err.Error(), } sendMsg_byte, err := json.Marshal(sendMsg_str) if err != nil { log.Fatalf("序列化%v失败:%v", sendMsg_str, err) return } c.send(sendMsg_byte) // 返回失败消息 return } // 挖矿开始 respData := message.ConfigurationMiningResp{ Coin: mining_msg.Coin, Algo: mining_msg.Algo, Pool: mining_msg.Pool, PoolUrl: mining_msg.PoolUrl, WorkerID: mining_msg.WorkerID, WalletAddress: mining_msg.WalletAddress, WatchUrl: "", // 这里需要根据矿池自动生成 } sendMsg_str := message.ServerMsgResp{ ID: c.Auth + "." + c.MachineCode, Result: true, Data: respData, } sendMsg_byte, err := json.Marshal(sendMsg_str) if err != nil { log.Fatalf("序列化%v失败:%v", sendMsg_str, err) return } c.send(sendMsg_byte) // 返回成功消息 // 启动任务结束监控,任务结束后恢复持续挖矿 go c.monitorMiningTask(mining_msg) case "mining.end": c.os.StopMining(c.osName) default: log.Printf("未知的方法:%s", data.Method) } } // monitorMiningTask 监控挖矿任务,任务结束后恢复持续挖矿 func (c *Client) monitorMiningTask(cfg message.ConfigurationMiningMsg) { endTimestamp := int64(cfg.EndTimestamp) currentTimestamp := time.Now().Unix() // 如果任务已经结束,直接恢复持续挖矿 if endTimestamp <= currentTimestamp { if c.sustainMiner != nil { c.sustainMiner.Resume() } return } // 计算等待时间 waitDuration := time.Second * time.Duration(endTimestamp-currentTimestamp) // 等待任务结束 time.Sleep(waitDuration) log.Println("挖矿任务已结束,恢复持续挖矿") // 恢复持续挖矿 if c.sustainMiner != nil { c.sustainMiner.Resume() } } // startHeartbeat 启动心跳检查 func (c *Client) startHeartbeat() { ticker := time.NewTicker(30 * time.Second) // 每30秒发送一次心跳 defer ticker.Stop() for { select { case <-ticker.C: // 检查是否超过60秒未收到pong响应 c.mu.Lock() lastPong := c.lastPong conn := c.ServerConn c.mu.Unlock() if time.Since(lastPong) > 60*time.Second { log.Println("超过60秒未收到心跳响应,连接可能已断开") c.reconnect() return } // 发送心跳 if conn != nil { pingMsg := message.ServerMsg{ ID: c.Auth + "." + c.MachineCode, Method: "ping", Params: nil, } msgByte, err := json.Marshal(pingMsg) if err != nil { log.Printf("序列化心跳消息失败:%v", err) continue } if err := c.send(msgByte); err != nil { log.Printf("发送心跳失败:%v", err) c.reconnect() return } log.Println("发送心跳") } case <-c.stopHeartbeat: return } } } // reconnect 重连服务器 func (c *Client) reconnect() { c.mu.Lock() if c.ServerConn != nil { c.ServerConn.Close() c.ServerConn = nil } close(c.stopHeartbeat) c.stopHeartbeat = make(chan struct{}) serverURL := c.serverURL c.mu.Unlock() log.Println("尝试重新连接服务器...") // 重连逻辑 for { time.Sleep(5 * time.Second) // 等待5秒后重连 conn, err := net.Dial("tcp", serverURL) if err != nil { log.Printf("重连失败:%v,5秒后重试...", err) continue } log.Println("重连成功") c.mu.Lock() c.ServerConn = conn c.lastPong = time.Now() c.mu.Unlock() // 重新发送机器码 c.sendMachineCode() // 重新启动心跳和接收消息 go c.startHeartbeat() go c.receiveMsg() return } } func Star(url string) { // url := "10.168.2.249:8080" // url := "47.108.221.51:23456" client := newClient(url) globalClient = client client.sendMachineCode() // 初始化并启动持续挖矿(如果配置启用) if client.sustainMiner != nil { err := client.sustainMiner.LoadConfig() if err != nil { log.Printf("加载持续挖矿配置失败:%v", err) } else { err = client.sustainMiner.Start() if err != nil { log.Printf("启动持续挖矿失败:%v", err) } } } // 启动心跳检查 go client.startHeartbeat() // 开始接收服务端消息 client.receiveMsg() } var globalClient *Client // StopClient 停止客户端(供外部调用) func StopClient() { if globalClient != nil { globalClient.Stop() } }