This commit is contained in:
lzx
2026-01-04 13:57:08 +08:00
parent 68b00962d3
commit f08ad364d9
18 changed files with 408 additions and 136 deletions

View File

@@ -14,6 +14,7 @@
package client
import (
"bufio"
message "client/internal/msg"
"client/internal/src"
"client/internal/src/linux"
@@ -39,6 +40,7 @@ type Client struct {
MachineCode string
ServerConn net.Conn // 服务连接
GPUs map[int]message.GPU // {"gpu编号": message.GPU{}, ...}
MiningSofts []string
os *src.SystemServer
osName string
serverURL string // 服务器地址
@@ -58,7 +60,7 @@ func newClient(url string) *Client {
// 读取身份文件
auth, err := utils.ReadFile("./auth")
if err != nil {
log.Fatalf("获取客户端身份失败:%v", err)
log.Printf("获取客户端身份失败:%v", err)
return nil
}
@@ -101,31 +103,32 @@ func newClient(url string) *Client {
} else {
confFile = "mining.linux.conf"
}
var softs = make([]string, 0)
// 读取挖矿配置
cfg, err := ini.Load(confFile)
if err == nil {
sectionBzMiner := cfg.Section("bzminer")
miningConfig.BzMinerPath = sectionBzMiner.Key("path").String()
sectionLolMiner := cfg.Section("lolminer")
miningConfig.LolMinerPath = sectionLolMiner.Key("path").String()
if miningConfig.LolMinerPath != "" {
softs = append(softs, "lolminer")
}
sectionRigel := cfg.Section("rigel")
miningConfig.RigelPath = sectionRigel.Key("path").String()
if miningConfig.RigelPath != "" {
softs = append(softs, "rigel")
}
sectionProxy := cfg.Section("proxy")
miningConfig.ProxyEnabled, _ = sectionProxy.Key("proxy").Bool()
}
client.MiningSofts = softs
client.sustainMiner = sustain.NewSustainMiner(systemServer, sys, miningConfig)
// 读取主机MAC地址信息
var machine_code string
machine_code, err = systemServer.GetMACAddress(sys)
if err != nil {
log.Fatalln(err)
panic("获取当前主机信息失败,程序已退出,请检查网络后重新启动本客户端。")
log.Println(err)
log.Fatalln("获取当前主机信息失败,程序已退出,请检查网络后重新启动本客户端。")
}
// utils.WirteFile("./machinecode", machine_code)
@@ -133,8 +136,8 @@ func newClient(url string) *Client {
gpus, err := systemServer.GetGPUInfo(sys)
if err != nil {
log.Fatalln(err)
panic("获取当前主机GPU数据失败程序已退出请检查GPU驱动等程序后重新启动本客户端。")
log.Println(err)
log.Fatalln("获取当前主机GPU数据失败程序已退出请检查GPU驱动等程序后重新启动本客户端。")
}
client.GPUs = gpus
client.serverURL = url
@@ -144,7 +147,7 @@ func newClient(url string) *Client {
// 连接服务端
server_conn, err := net.Dial("tcp", url)
if err != nil {
log.Fatalf("客户端连接到服务器失败:%v", err)
log.Printf("客户端连接到服务器失败:%v", err)
return nil
}
@@ -171,14 +174,18 @@ func (c *Client) Stop() {
func (c *Client) sendMachineCode() {
var msg message.ServerMsg
msg.ID = c.Auth + "." + c.MachineCode
msg.ID = c.Auth + "::" + c.MachineCode
msg.Method = "auth.machineCode"
msg.Params = c.GPUs
var params = make(map[string]any)
params["gpus"] = c.GPUs
params["miningsofts"] = c.MiningSofts
msg.Params = params
msgByte, err := json.Marshal(msg)
if err != nil {
log.Fatalf("消息(%v)序列化失败:%v", msg, err)
log.Printf("消息(%v)序列化失败:%v", msg, err)
return
}
log.Println(string(msgByte))
c.send(msgByte)
}
@@ -192,7 +199,8 @@ func (c *Client) receiveMsg() {
c.mu.Unlock()
}()
buffer := make([]byte, 1024)
// 使用 bufio.Reader 读取数据
reader := bufio.NewReader(c.ServerConn)
for {
c.mu.Lock()
@@ -206,26 +214,29 @@ func (c *Client) receiveMsg() {
// 设置读取超时,用于检测连接是否存活
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
n, err := conn.Read(buffer)
// 读取一行数据,直到遇到换行符
msgByte, err := reader.ReadString('\n')
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Println("读取超时,连接可能已断开")
c.reconnect()
return
}
if err.Error() == "EOF" {
// 服务端关闭连接时,退出接收循环
log.Println("服务端关闭了连接")
c.reconnect()
return
}
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Println("读取超时,连接可能已断开")
c.reconnect()
return
}
log.Printf("接收数据失败: %v", err)
c.reconnect()
return
}
msgByte := buffer[:n]
go c.handleReceiveMsg(msgByte)
// 输出接收到的消息
log.Printf("收到新消息:%s", msgByte)
go c.handleReceiveMsg([]byte(msgByte)) // 处理消息
}
}
@@ -237,7 +248,7 @@ func (c *Client) send(msg []byte) error {
return fmt.Errorf("连接已断开")
}
_, err := c.ServerConn.Write(msg)
_, err := c.ServerConn.Write(append(msg, '\n'))
if err != nil {
log.Printf("发送消息失败:%v\n消息内容%s", err, string(msg))
return err
@@ -249,26 +260,43 @@ func (c *Client) handleReceiveMsg(msg []byte) {
var data message.ServerMsg
err := json.Unmarshal(msg, &data)
if err != nil {
log.Fatalf("解析接收到的消息失败:%v", err)
log.Printf("解析接收到的消息失败:%v", err)
return
}
parts := strings.Split(data.ID, ".")
parts := strings.Split(data.ID, "::")
if len(parts) != 2 {
log.Fatalf("解析通信协议(server->client)失败")
log.Printf("解析通信协议(server->client)失败")
return
}
auth, machine_code := parts[0], parts[1]
if c.Auth != auth || c.MachineCode != machine_code {
log.Fatalf("客户端接收到错误的服务端消息")
log.Printf("客户端接收到错误的服务端消息")
return
}
switch data.Method {
case "pong":
// 收到心跳响应更新最后pong时间
case "ping":
// 收到服务端心跳更新最后ping时间并回复pong
c.mu.Lock()
c.lastPong = time.Now()
c.mu.Unlock()
log.Println("收到心跳响应")
log.Println("收到服务端心跳")
// 回复pong
pongMsg := message.ServerMsg{
ID: c.Auth + "::" + c.MachineCode,
Method: "pong",
Params: nil,
}
msgByte, err := json.Marshal(pongMsg)
if err != nil {
log.Printf("序列化pong消息失败%v", err)
return
}
if err := c.send(msgByte); err != nil {
log.Printf("发送pong响应失败%v", err)
} else {
log.Println("已回复pong")
}
return
case "mining.req":
// 将 data.Params 重新序列化为 JSON然后反序列化为 ConfigurationMiningMsg
@@ -277,7 +305,7 @@ func (c *Client) handleReceiveMsg(msg []byte) {
if err != nil {
log.Printf("序列化 Params 失败:%v", err)
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
ID: c.Auth + "::" + c.MachineCode,
Result: false,
Data: fmt.Errorf("序列化 Params 失败:%v", err),
}
@@ -291,7 +319,7 @@ func (c *Client) handleReceiveMsg(msg []byte) {
if err != nil {
log.Printf("解析挖矿配置消息失败:%v, Params: %s", err, string(paramsJSON))
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
ID: c.Auth + "::" + c.MachineCode,
Result: false,
Data: fmt.Errorf("解析挖矿配置消息失败:%v", err),
}
@@ -309,15 +337,16 @@ func (c *Client) handleReceiveMsg(msg []byte) {
err = c.os.Mining(c.osName, mining_msg)
if err != nil {
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
ID: c.Auth + "::" + c.MachineCode,
Result: false,
Data: err.Error(),
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
log.Printf("序列化%v失败%v", sendMsg_str, err)
return
}
log.Println(string(sendMsg_byte))
c.send(sendMsg_byte) // 返回失败消息
return
}
@@ -332,13 +361,14 @@ func (c *Client) handleReceiveMsg(msg []byte) {
WatchUrl: "", // 这里需要根据矿池自动生成
}
sendMsg_str := message.ServerMsgResp{
ID: c.Auth + "." + c.MachineCode,
ID: c.Auth + "::" + c.MachineCode,
Result: true,
Data: respData,
Method: "mining.resp",
}
sendMsg_byte, err := json.Marshal(sendMsg_str)
if err != nil {
log.Fatalf("序列化%v失败%v", sendMsg_str, err)
log.Printf("序列化%v失败%v", sendMsg_str, err)
return
}
c.send(sendMsg_byte) // 返回成功消息
@@ -379,45 +409,24 @@ func (c *Client) monitorMiningTask(cfg message.ConfigurationMiningMsg) {
}
}
// startHeartbeat 启动心跳检查
// startHeartbeat 启动心跳检查监控是否收到服务端的ping
func (c *Client) startHeartbeat() {
ticker := time.NewTicker(30 * time.Second) // 每30秒发送一次心跳
ticker := time.NewTicker(30 * time.Second) // 每30秒检查一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 检查是否超过60秒未收到pong响应
// 检查是否超过60秒未收到服务端的ping
c.mu.Lock()
lastPong := c.lastPong
conn := c.ServerConn
c.mu.Unlock()
if time.Since(lastPong) > 60*time.Second {
log.Println("超过60未收到心跳响应,连接可能已断开")
if time.Since(lastPong) > 60*time.Minute {
log.Println("超过60分钟未收到服务端心跳,连接可能已断开")
c.reconnect()
return
}
// 发送心跳
if conn != nil {
pingMsg := message.ServerMsg{
ID: c.Auth + "." + c.MachineCode,
Method: "ping",
Params: nil,
}
msgByte, err := json.Marshal(pingMsg)
if err != nil {
log.Printf("序列化心跳消息失败:%v", err)
continue
}
if err := c.send(msgByte); err != nil {
log.Printf("发送心跳失败:%v", err)
c.reconnect()
return
}
log.Println("发送心跳")
}
case <-c.stopHeartbeat:
return
}