use rabbitmq replaced zeromq

This commit is contained in:
lzx 2025-08-01 11:30:45 +08:00
parent 549764149b
commit 16e1e4096e
9 changed files with 158 additions and 30 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
./cmd/proxy ./cmd/proxy
./cmd/config.json ./cmd/config.json
./main.go

View File

@ -1,6 +1,6 @@
{ {
"coin": "alph", "coin": "alph",
"zmqAddr": "tcp://127.0.0.1:39001", "zmqAddr": "amqp://guest:guest@localhost:5672/",
"tcpAddr": "0.0.0.0:39002", "tcpAddr": "0.0.0.0:39002",
"proxyAddr": "stratum+tcp://alph.m2pool.com:33390" "proxyAddr": "alph.m2pool.com:33390"
} }

BIN
cmd/proxy Normal file

Binary file not shown.

5
go.mod
View File

@ -2,4 +2,7 @@ module proxy
go 1.23.1 go 1.23.1
require github.com/zeromq/goczmq v4.1.0+incompatible require (
github.com/rabbitmq/amqp091-go v1.10.0
github.com/zeromq/goczmq v4.1.0+incompatible
)

4
go.sum
View File

@ -1,2 +1,6 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc= github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc=
github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg= github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -26,7 +26,7 @@ func NewMiner(coin string, poolAddress string, minerConn net.Conn) (*Miner, erro
if err != nil { if err != nil {
return nil, fmt.Errorf("pool连接失败: %v", err) return nil, fmt.Errorf("pool连接失败: %v", err)
} }
fmt.Println("有新矿工接入")
return &Miner{ return &Miner{
Coin: coin, Coin: coin,
PoolConn: poolConn, PoolConn: poolConn,
@ -68,6 +68,7 @@ func (m *Miner) HandleMinerMsg(ch chan string) {
for { for {
msgStr, err := reader.ReadString('\n') msgStr, err := reader.ReadString('\n')
fmt.Println(topic + msgStr)
if err != nil { if err != nil {
fmt.Println("miner消息读取失败", err) fmt.Println("miner消息读取失败", err)
return return
@ -113,6 +114,7 @@ func (m *Miner) HandlePoolMsg() {
reader := bufio.NewReader(m.PoolConn) reader := bufio.NewReader(m.PoolConn)
for { for {
poolMsg, err := reader.ReadString('\n') poolMsg, err := reader.ReadString('\n')
fmt.Println("[pool]:", poolMsg)
if err != nil { if err != nil {
fmt.Println("pool消息读取失败", err) fmt.Println("pool消息读取失败", err)
return return

View File

@ -64,7 +64,7 @@ func (p *ProxyCtx) handleMinerConnect() {
fmt.Println(TOPIC+"接收连接失败:", err) fmt.Println(TOPIC+"接收连接失败:", err)
continue continue
} }
fmt.Println("有新矿工接入")
go func(conn net.Conn) { go func(conn net.Conn) {
ch := make(chan string, 1) ch := make(chan string, 1)
minerObj, err := miner.NewMiner(p.Coin, p.Cfg.DefaultAddr, conn) minerObj, err := miner.NewMiner(p.Coin, p.Cfg.DefaultAddr, conn)
@ -101,7 +101,7 @@ func (p *ProxyCtx) handleMinerConnect() {
} }
func (p *ProxyCtx) startZMQ() { func (p *ProxyCtx) startZMQ() {
zmq.StartZMQ(p.Cfg.ZmqAddr, p.ProxyList, &p.mu) zmq.StartZMQ(p.Cfg.ZmqAddr, "testQueue", p.ProxyList, &p.mu)
} }
func StartProxy() { func StartProxy() {

View File

@ -1,49 +1,134 @@
package zmq package zmq
// import (
// "encoding/json"
// "fmt"
// "proxy/internal/msg"
// "sync"
// "time"
// "github.com/zeromq/goczmq"
// )
// func initZmqPull(sub_to string) *goczmq.Sock {
// pull_ch, err := goczmq.NewPull(sub_to)
// if err != nil {
// fmt.Println("[zmq]:", err)
// }
// //pull_ch.SetMaxmsgsize(1024 * 1024 * 8)
// return pull_ch
// }
// func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) {
// var data msg.ZmqMsg
// if err := json.Unmarshal(zmqMsg, &data); err != nil {
// fmt.Println("[zmq]:", err)
// return
// }
// proxyListLock.Lock()
// // MethodID: 0(add), 1(delete)
// if data.MethodID == 0 {
// proxyList[data.ID] = data.Address
// } else {
// delete(proxyList, data.ID)
// }
// proxyListLock.Unlock()
// }
// func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) {
// conn := initZmqPull(zmqAddr)
// for {
// zmqMsg, _, err := conn.RecvFrame()
// fmt.Println("[zmq]:", zmqMsg)
// if err != nil {
// fmt.Println("[zmq recv]:", err)
// time.Sleep(time.Second) // 防止CPU空转
// continue
// }
// handleZmqMsg(zmqMsg, proxyList, proxyListLock)
// }
// }
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"proxy/internal/msg" "proxy/internal/msg"
"sync" "sync"
"time"
"github.com/zeromq/goczmq" amqp "github.com/rabbitmq/amqp091-go"
) )
func initZmqPull(sub_to string) *goczmq.Sock { func initRabbitConsumer(rabbitURL, queueName string) (*amqp.Connection, *amqp.Channel, <-chan amqp.Delivery, error) {
pull_ch, err := goczmq.NewPull(sub_to) conn, err := amqp.Dial(rabbitURL)
if err != nil { if err != nil {
fmt.Println("[zmq]:", err) return nil, nil, nil, fmt.Errorf("failed to connect to rabbitmq: %v", err)
}
//pull_ch.SetMaxmsgsize(1024 * 1024 * 8)
return pull_ch
} }
func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) { ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, nil, nil, fmt.Errorf("failed to open a channel: %v", err)
}
// 确保队列存在
_, err = ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
conn.Close()
return nil, nil, nil, fmt.Errorf("failed to declare a queue: %v", err)
}
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
conn.Close()
return nil, nil, nil, fmt.Errorf("failed to register a consumer: %v", err)
}
return conn, ch, msgs, nil
}
func handleRabbitMsg(body []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) {
var data msg.ZmqMsg var data msg.ZmqMsg
if err := json.Unmarshal(zmqMsg, &data); err != nil { if err := json.Unmarshal(body, &data); err != nil {
fmt.Println("[zmq]:", err) fmt.Println("[rabbitmq]: json unmarshal error:", err)
return return
} }
proxyListLock.Lock() proxyListLock.Lock()
// MethodID: 0(add), 1(delete) defer proxyListLock.Unlock()
if data.MethodID == 0 { if data.MethodID == 0 {
proxyList[data.ID] = data.Address proxyList[data.ID] = data.Address
} else { } else {
delete(proxyList, data.ID) delete(proxyList, data.ID)
} }
proxyListLock.Unlock()
} }
func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) { func StartZMQ(rabbitURL, queueName string, proxyList map[string]string, proxyListLock *sync.RWMutex) {
conn := initZmqPull(zmqAddr) conn, ch, msgs, err := initRabbitConsumer(rabbitURL, queueName)
for {
zmqMsg, _, err := conn.RecvFrame()
if err != nil { if err != nil {
fmt.Println("[zmq recv]:", err) fmt.Println("[rabbitmq]:", err)
time.Sleep(time.Second) // 防止CPU空转 return
continue
} }
handleZmqMsg(zmqMsg, proxyList, proxyListLock) defer conn.Close()
defer ch.Close()
fmt.Println("[rabbitmq]: waiting for messages...")
for d := range msgs {
// fmt.Printf("[rabbitmq] received: %s\n", d.Body)
handleRabbitMsg(d.Body, proxyList, proxyListLock)
} }
} }

33
main.go Normal file
View File

@ -0,0 +1,33 @@
package main
import (
"fmt"
"log"
"time"
"github.com/zeromq/goczmq"
)
func main() {
// 绑定到指定地址(可以是本地或远程 IP
endpoint := "tcp://0.0.0.0:5555" // 本机所有 IP 可访问
sock, err := goczmq.NewPush(endpoint)
if err != nil {
log.Fatalf("Failed to create PUSH socket: %v", err)
}
defer sock.Destroy()
fmt.Println("PUSH socket bound to", endpoint)
// 定时发送消息
for i := 0; i < 1000; i++ {
msg := fmt.Sprintf("Hello %d", i)
err := sock.SendFrame([]byte(msg), goczmq.FlagNone)
if err != nil {
log.Printf("Send error: %v", err)
} else {
fmt.Println("Sent:", msg)
}
time.Sleep(1 * time.Second)
}
}