commit cfda4d8425750789d2dd4c2053c138998026c913 Author: lzx <393768033@qq.com> Date: Thu Jul 31 10:34:17 2025 +0800 proxy test ver diff --git a/README.md b/README.md new file mode 100644 index 0000000..74eadfc --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# 代理 \ No newline at end of file diff --git a/cmd/config.json b/cmd/config.json new file mode 100644 index 0000000..89247f4 --- /dev/null +++ b/cmd/config.json @@ -0,0 +1,6 @@ +{ + "coin": "alph", + "zmqAddr": "tcp://127.0.0.1:39001", + "tcpAddr": "0.0.0.0:39002", + "proxyAddr": "stratum+tcp://alph.m2pool.com:33390" +} \ No newline at end of file diff --git a/cmd/proxy b/cmd/proxy new file mode 100644 index 0000000..fb12c8f Binary files /dev/null and b/cmd/proxy differ diff --git a/cmd/proxy.go b/cmd/proxy.go new file mode 100644 index 0000000..e4c4ae9 --- /dev/null +++ b/cmd/proxy.go @@ -0,0 +1,9 @@ +package main + +import ( + server "proxy/internal" +) + +func main() { + server.StartProxy() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..93160bd --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module proxy + +go 1.23.1 + +require github.com/zeromq/goczmq v4.1.0+incompatible diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..112c49b --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc= +github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg= diff --git a/internal/miner/miner.go b/internal/miner/miner.go new file mode 100644 index 0000000..54eac68 --- /dev/null +++ b/internal/miner/miner.go @@ -0,0 +1,130 @@ +package miner + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "proxy/internal/msg" + "strings" + "sync" +) + +const topic string = "[miner]:" + +type Miner struct { + sync.Mutex + Coin string + ID string // user + miner + PoolConn net.Conn + MinerConn net.Conn + PoolAddress string +} + +func NewMiner(coin string, poolAddress string, minerConn net.Conn) (*Miner, error) { + poolConn, err := net.Dial("tcp", poolAddress) + if err != nil { + return nil, fmt.Errorf("pool连接失败: %v", err) + } + + return &Miner{ + Coin: coin, + PoolConn: poolConn, + MinerConn: minerConn, + PoolAddress: poolAddress, + }, nil +} + +// 动态切换矿池地址并建立新连接 +func (m *Miner) ChangePoolAddress(newAddress string) { + m.Lock() + defer m.Unlock() + + // 尝试建立新连接 + newConn, err := net.Dial("tcp", newAddress) + if err != nil { + fmt.Println("切换pool连接失败:", err) + return + } + + // 关闭旧连接 + if m.PoolConn != nil { + _ = m.PoolConn.Close() + } + + m.PoolConn = newConn + m.PoolAddress = newAddress + + fmt.Println("成功切换矿池地址为:", newAddress) +} + +// 矿工消息处理 +func (m *Miner) HandleMinerMsg(ch chan string) { + defer m.MinerConn.Close() + defer m.PoolConn.Close() + + reader := bufio.NewReader(m.MinerConn) + sent := false // 保证 userSign 只发送一次 + + for { + msgStr, err := reader.ReadString('\n') + if err != nil { + fmt.Println("miner消息读取失败:", err) + return + } + // fmt.Println("从矿工收到消息:", msgStr) + switch m.Coin { + case "nexa": + // + default: + var msg msg.Authorize_msg + if err := json.Unmarshal([]byte(msgStr), &msg); err == nil && + msg.Method == "mining.authorize" && len(msg.Params) >= 1 && !sent { + parts := strings.Split(msg.Params[0], ".") + if len(parts) >= 2 { + userSign := parts[0] + "-" + parts[1] + select { + case ch <- userSign: + sent = true + default: + } + } else { + fmt.Println(topic+"mining.authorize解析user-miner错误\n", err) + } + + } + } + + if m.PoolConn != nil { + _, err = m.PoolConn.Write([]byte(msgStr)) + if err != nil { + fmt.Println("转发到pool失败:", err) + return + } + } + } +} + +// 矿池消息处理 +func (m *Miner) HandlePoolMsg() { + defer m.MinerConn.Close() + defer m.PoolConn.Close() + + reader := bufio.NewReader(m.PoolConn) + for { + poolMsg, err := reader.ReadString('\n') + if err != nil { + fmt.Println("pool消息读取失败:", err) + return + } + // fmt.Println("从矿池收到消息:", poolMsg) + + if m.MinerConn != nil { + _, err = m.MinerConn.Write([]byte(poolMsg)) + if err != nil { + fmt.Println("转发到miner失败:", err) + return + } + } + } +} diff --git a/internal/msg/msg.go b/internal/msg/msg.go new file mode 100644 index 0000000..204f3a6 --- /dev/null +++ b/internal/msg/msg.go @@ -0,0 +1,13 @@ +package msg + +type ZmqMsg struct { + MethodID int `json:"methodId"` //0(add), 1(delete) + ID string `json:"id"` // user-miner + Address string `json:"address"` // 转发目标地址 +} + +type Authorize_msg struct { + ID int `json:"id"` + Method string `json:"method"` + Params []string +} diff --git a/internal/proxy.go b/internal/proxy.go new file mode 100644 index 0000000..0b2f82b --- /dev/null +++ b/internal/proxy.go @@ -0,0 +1,115 @@ +package server + +import ( + "encoding/json" + "fmt" + "net" + "os" + "proxy/internal/miner" + "proxy/internal/zmq" + "sync" + "time" +) + +const TOPIC = "[server]: " + +type Config struct { + Coin string `json:"coin"` + ZmqAddr string `json:"zmqAddr"` + TcpAddr string `json:"tcpAddr"` + DefaultAddr string `json:"proxyAddr"` +} + +type ProxyCtx struct { + Coin string + mu sync.RWMutex + Cfg Config + Listener net.Listener + MinerConn []*miner.Miner + ProxyList map[string]string +} + +func initConfig() Config { + data, err := os.ReadFile("./config.json") + if err != nil { + panic(TOPIC + "配置读取失败: " + err.Error()) + } + var cfg Config + if err := json.Unmarshal(data, &cfg); err != nil { + panic(TOPIC + "配置解析失败: " + err.Error()) + } + return cfg +} + +func newProxy() *ProxyCtx { + cfg := initConfig() + listener, err := net.Listen("tcp", cfg.TcpAddr) + if err != nil { + panic(TOPIC + "监听失败: " + err.Error()) + } + return &ProxyCtx{ + Coin: cfg.Coin, + Cfg: cfg, + Listener: listener, + MinerConn: []*miner.Miner{}, + ProxyList: make(map[string]string), + } +} + +func (p *ProxyCtx) handleMinerConnect() { + fmt.Println(TOPIC, "TCP 服务已启动,监听地址:", p.Cfg.TcpAddr) + for { + conn, err := p.Listener.Accept() + if err != nil { + fmt.Println(TOPIC+"接收连接失败:", err) + continue + } + + go func(conn net.Conn) { + ch := make(chan string, 1) + minerObj, err := miner.NewMiner(p.Coin, p.Cfg.DefaultAddr, conn) + if err != nil { + fmt.Println(TOPIC, "创建 Miner 失败:", err) + conn.Close() + return + } + + go minerObj.HandleMinerMsg(ch) + + select { + case userSign := <-ch: + p.mu.RLock() + addr, ok := p.ProxyList[userSign] + p.mu.RUnlock() + if ok { + minerObj.ChangePoolAddress(addr) + } + + p.mu.Lock() + p.MinerConn = append(p.MinerConn, minerObj) + p.mu.Unlock() + + go minerObj.HandlePoolMsg() + + case <-time.After(10 * time.Second): + fmt.Println(TOPIC, "超时未收到矿工认证,关闭连接") + minerObj.MinerConn.Close() + minerObj.PoolConn.Close() + } + }(conn) + } +} + +func (p *ProxyCtx) startZMQ() { + zmq.StartZMQ(p.Cfg.ZmqAddr, p.ProxyList, &p.mu) +} + +func StartProxy() { + p := newProxy() + + // 启动 ZeroMQ 控制线程 + go p.startZMQ() + + // 启动 TCP 服务 + p.handleMinerConnect() +} diff --git a/internal/zmq/zmq.go b/internal/zmq/zmq.go new file mode 100644 index 0000000..7a5a20b --- /dev/null +++ b/internal/zmq/zmq.go @@ -0,0 +1,49 @@ +package zmq + +import ( + "encoding/json" + "fmt" + "proxy/internal/msg" + "sync" + "time" + + "github.com/zeromq/goczmq" +) + +func initZmqPull(sub_to string) *goczmq.Sock { + pull_ch, err := goczmq.NewPull(sub_to) + if err != nil { + fmt.Println("[zmq]:", err) + } + //pull_ch.SetMaxmsgsize(1024 * 1024 * 8) + return pull_ch +} + +func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) { + var data msg.ZmqMsg + if err := json.Unmarshal(zmqMsg, &data); err != nil { + fmt.Println("[zmq]:", err) + return + } + proxyListLock.Lock() + // MethodID: 0(add), 1(delete) + if data.MethodID == 0 { + proxyList[data.ID] = data.Address + } else { + delete(proxyList, data.ID) + } + proxyListLock.Unlock() +} + +func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) { + conn := initZmqPull(zmqAddr) + for { + zmqMsg, _, err := conn.RecvFrame() + if err != nil { + fmt.Println("[zmq recv]:", err) + time.Sleep(time.Second) // 防止CPU空转 + continue + } + handleZmqMsg(zmqMsg, proxyList, proxyListLock) + } +}