diff --git a/.gitignore b/.gitignore index 2da11fd..8741457 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ ./cmd/proxy -./cmd/config.json \ No newline at end of file +./cmd/config.json +./main.go \ No newline at end of file diff --git a/cmd/config.json b/cmd/config.json index 89247f4..a4494ff 100644 --- a/cmd/config.json +++ b/cmd/config.json @@ -1,6 +1,6 @@ { "coin": "alph", - "zmqAddr": "tcp://127.0.0.1:39001", + "zmqAddr": "amqp://guest:guest@localhost:5672/", "tcpAddr": "0.0.0.0:39002", - "proxyAddr": "stratum+tcp://alph.m2pool.com:33390" -} \ No newline at end of file + "proxyAddr": "alph.m2pool.com:33390" +} diff --git a/cmd/proxy b/cmd/proxy new file mode 100644 index 0000000..b10444f Binary files /dev/null and b/cmd/proxy differ diff --git a/go.mod b/go.mod index 93160bd..ee127d8 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module proxy go 1.23.1 -require github.com/zeromq/goczmq v4.1.0+incompatible +require ( + github.com/rabbitmq/amqp091-go v1.10.0 + github.com/zeromq/goczmq v4.1.0+incompatible +) diff --git a/go.sum b/go.sum index 112c49b..b3fc2a5 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,6 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc= github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/internal/miner/miner.go b/internal/miner/miner.go index 54eac68..fb00c82 100644 --- a/internal/miner/miner.go +++ b/internal/miner/miner.go @@ -26,7 +26,7 @@ func NewMiner(coin string, poolAddress string, minerConn net.Conn) (*Miner, erro if err != nil { return nil, fmt.Errorf("pool连接失败: %v", err) } - + fmt.Println("有新矿工接入") return &Miner{ Coin: coin, PoolConn: poolConn, @@ -68,6 +68,7 @@ func (m *Miner) HandleMinerMsg(ch chan string) { for { msgStr, err := reader.ReadString('\n') + fmt.Println(topic + msgStr) if err != nil { fmt.Println("miner消息读取失败:", err) return @@ -113,6 +114,7 @@ func (m *Miner) HandlePoolMsg() { reader := bufio.NewReader(m.PoolConn) for { poolMsg, err := reader.ReadString('\n') + fmt.Println("[pool]:", poolMsg) if err != nil { fmt.Println("pool消息读取失败:", err) return diff --git a/internal/proxy.go b/internal/proxy.go index 0b2f82b..ad3a5a1 100644 --- a/internal/proxy.go +++ b/internal/proxy.go @@ -64,7 +64,7 @@ func (p *ProxyCtx) handleMinerConnect() { fmt.Println(TOPIC+"接收连接失败:", err) continue } - + fmt.Println("有新矿工接入") go func(conn net.Conn) { ch := make(chan string, 1) minerObj, err := miner.NewMiner(p.Coin, p.Cfg.DefaultAddr, conn) @@ -101,7 +101,7 @@ func (p *ProxyCtx) handleMinerConnect() { } func (p *ProxyCtx) startZMQ() { - zmq.StartZMQ(p.Cfg.ZmqAddr, p.ProxyList, &p.mu) + zmq.StartZMQ(p.Cfg.ZmqAddr, "testQueue", p.ProxyList, &p.mu) } func StartProxy() { diff --git a/internal/zmq/zmq.go b/internal/zmq/zmq.go index 7a5a20b..abbd8a0 100644 --- a/internal/zmq/zmq.go +++ b/internal/zmq/zmq.go @@ -1,49 +1,134 @@ package zmq +// import ( +// "encoding/json" +// "fmt" +// "proxy/internal/msg" +// "sync" +// "time" + +// "github.com/zeromq/goczmq" +// ) + +// func initZmqPull(sub_to string) *goczmq.Sock { +// pull_ch, err := goczmq.NewPull(sub_to) +// if err != nil { +// fmt.Println("[zmq]:", err) +// } +// //pull_ch.SetMaxmsgsize(1024 * 1024 * 8) +// return pull_ch +// } + +// func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) { +// var data msg.ZmqMsg +// if err := json.Unmarshal(zmqMsg, &data); err != nil { +// fmt.Println("[zmq]:", err) +// return +// } +// proxyListLock.Lock() +// // MethodID: 0(add), 1(delete) +// if data.MethodID == 0 { +// proxyList[data.ID] = data.Address +// } else { +// delete(proxyList, data.ID) +// } +// proxyListLock.Unlock() +// } + +// func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) { +// conn := initZmqPull(zmqAddr) +// for { +// zmqMsg, _, err := conn.RecvFrame() +// fmt.Println("[zmq]:", zmqMsg) +// if err != nil { +// fmt.Println("[zmq recv]:", err) +// time.Sleep(time.Second) // 防止CPU空转 +// continue +// } +// handleZmqMsg(zmqMsg, proxyList, proxyListLock) +// } +// } + import ( "encoding/json" "fmt" "proxy/internal/msg" "sync" - "time" - "github.com/zeromq/goczmq" + amqp "github.com/rabbitmq/amqp091-go" ) -func initZmqPull(sub_to string) *goczmq.Sock { - pull_ch, err := goczmq.NewPull(sub_to) +func initRabbitConsumer(rabbitURL, queueName string) (*amqp.Connection, *amqp.Channel, <-chan amqp.Delivery, error) { + conn, err := amqp.Dial(rabbitURL) if err != nil { - fmt.Println("[zmq]:", err) + return nil, nil, nil, fmt.Errorf("failed to connect to rabbitmq: %v", err) } - //pull_ch.SetMaxmsgsize(1024 * 1024 * 8) - return pull_ch + + ch, err := conn.Channel() + if err != nil { + conn.Close() + return nil, nil, nil, fmt.Errorf("failed to open a channel: %v", err) + } + + // 确保队列存在 + _, err = ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + conn.Close() + return nil, nil, nil, fmt.Errorf("failed to declare a queue: %v", err) + } + + msgs, err := ch.Consume( + queueName, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + conn.Close() + return nil, nil, nil, fmt.Errorf("failed to register a consumer: %v", err) + } + + return conn, ch, msgs, nil } -func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) { +func handleRabbitMsg(body []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) { var data msg.ZmqMsg - if err := json.Unmarshal(zmqMsg, &data); err != nil { - fmt.Println("[zmq]:", err) + if err := json.Unmarshal(body, &data); err != nil { + fmt.Println("[rabbitmq]: json unmarshal error:", err) return } proxyListLock.Lock() - // MethodID: 0(add), 1(delete) + defer proxyListLock.Unlock() if data.MethodID == 0 { proxyList[data.ID] = data.Address } else { delete(proxyList, data.ID) } - proxyListLock.Unlock() } -func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) { - conn := initZmqPull(zmqAddr) - for { - zmqMsg, _, err := conn.RecvFrame() - if err != nil { - fmt.Println("[zmq recv]:", err) - time.Sleep(time.Second) // 防止CPU空转 - continue - } - handleZmqMsg(zmqMsg, proxyList, proxyListLock) +func StartZMQ(rabbitURL, queueName string, proxyList map[string]string, proxyListLock *sync.RWMutex) { + conn, ch, msgs, err := initRabbitConsumer(rabbitURL, queueName) + if err != nil { + fmt.Println("[rabbitmq]:", err) + return + } + defer conn.Close() + defer ch.Close() + + fmt.Println("[rabbitmq]: waiting for messages...") + + for d := range msgs { + // fmt.Printf("[rabbitmq] received: %s\n", d.Body) + handleRabbitMsg(d.Body, proxyList, proxyListLock) } } diff --git a/main.go b/main.go new file mode 100644 index 0000000..961413c --- /dev/null +++ b/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/zeromq/goczmq" +) + +func main() { + // 绑定到指定地址(可以是本地或远程 IP) + endpoint := "tcp://0.0.0.0:5555" // 本机所有 IP 可访问 + sock, err := goczmq.NewPush(endpoint) + if err != nil { + log.Fatalf("Failed to create PUSH socket: %v", err) + } + defer sock.Destroy() + + fmt.Println("PUSH socket bound to", endpoint) + + // 定时发送消息 + for i := 0; i < 1000; i++ { + msg := fmt.Sprintf("Hello %d", i) + err := sock.SendFrame([]byte(msg), goczmq.FlagNone) + if err != nil { + log.Printf("Send error: %v", err) + } else { + fmt.Println("Sent:", msg) + } + time.Sleep(1 * time.Second) + } +}