proxy/internal/proxy.go

116 lines
2.2 KiB
Go

package server
import (
"encoding/json"
"fmt"
"net"
"os"
"proxy/internal/miner"
"proxy/internal/zmq"
"sync"
"time"
)
const TOPIC = "[server]: "
type Config struct {
Coin string `json:"coin"`
ZmqAddr string `json:"zmqAddr"`
TcpAddr string `json:"tcpAddr"`
DefaultAddr string `json:"proxyAddr"`
}
type ProxyCtx struct {
Coin string
mu sync.RWMutex
Cfg Config
Listener net.Listener
MinerConn []*miner.Miner
ProxyList map[string]string
}
func initConfig() Config {
data, err := os.ReadFile("./config.json")
if err != nil {
panic(TOPIC + "配置读取失败: " + err.Error())
}
var cfg Config
if err := json.Unmarshal(data, &cfg); err != nil {
panic(TOPIC + "配置解析失败: " + err.Error())
}
return cfg
}
func newProxy() *ProxyCtx {
cfg := initConfig()
listener, err := net.Listen("tcp", cfg.TcpAddr)
if err != nil {
panic(TOPIC + "监听失败: " + err.Error())
}
return &ProxyCtx{
Coin: cfg.Coin,
Cfg: cfg,
Listener: listener,
MinerConn: []*miner.Miner{},
ProxyList: make(map[string]string),
}
}
func (p *ProxyCtx) handleMinerConnect() {
fmt.Println(TOPIC, "TCP 服务已启动,监听地址:", p.Cfg.TcpAddr)
for {
conn, err := p.Listener.Accept()
if err != nil {
fmt.Println(TOPIC+"接收连接失败:", err)
continue
}
go func(conn net.Conn) {
ch := make(chan string, 1)
minerObj, err := miner.NewMiner(p.Coin, p.Cfg.DefaultAddr, conn)
if err != nil {
fmt.Println(TOPIC, "创建 Miner 失败:", err)
conn.Close()
return
}
go minerObj.HandleMinerMsg(ch)
select {
case userSign := <-ch:
p.mu.RLock()
addr, ok := p.ProxyList[userSign]
p.mu.RUnlock()
if ok {
minerObj.ChangePoolAddress(addr)
}
p.mu.Lock()
p.MinerConn = append(p.MinerConn, minerObj)
p.mu.Unlock()
go minerObj.HandlePoolMsg()
case <-time.After(10 * time.Second):
fmt.Println(TOPIC, "超时未收到矿工认证,关闭连接")
minerObj.MinerConn.Close()
minerObj.PoolConn.Close()
}
}(conn)
}
}
func (p *ProxyCtx) startZMQ() {
zmq.StartZMQ(p.Cfg.ZmqAddr, p.ProxyList, &p.mu)
}
func StartProxy() {
p := newProxy()
// 启动 ZeroMQ 控制线程
go p.startZMQ()
// 启动 TCP 服务
p.handleMinerConnect()
}