proxy/internal/zmq/zmq.go

135 lines
3.1 KiB
Go
Raw Normal View History

2025-07-31 02:34:17 +00:00
package zmq
2025-08-01 03:30:45 +00:00
// import (
// "encoding/json"
// "fmt"
// "proxy/internal/msg"
// "sync"
// "time"
// "github.com/zeromq/goczmq"
// )
// func initZmqPull(sub_to string) *goczmq.Sock {
// pull_ch, err := goczmq.NewPull(sub_to)
// if err != nil {
// fmt.Println("[zmq]:", err)
// }
// //pull_ch.SetMaxmsgsize(1024 * 1024 * 8)
// return pull_ch
// }
// func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) {
// var data msg.ZmqMsg
// if err := json.Unmarshal(zmqMsg, &data); err != nil {
// fmt.Println("[zmq]:", err)
// return
// }
// proxyListLock.Lock()
// // MethodID: 0(add), 1(delete)
// if data.MethodID == 0 {
// proxyList[data.ID] = data.Address
// } else {
// delete(proxyList, data.ID)
// }
// proxyListLock.Unlock()
// }
// func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) {
// conn := initZmqPull(zmqAddr)
// for {
// zmqMsg, _, err := conn.RecvFrame()
// fmt.Println("[zmq]:", zmqMsg)
// if err != nil {
// fmt.Println("[zmq recv]:", err)
// time.Sleep(time.Second) // 防止CPU空转
// continue
// }
// handleZmqMsg(zmqMsg, proxyList, proxyListLock)
// }
// }
2025-07-31 02:34:17 +00:00
import (
"encoding/json"
"fmt"
"proxy/internal/msg"
"sync"
2025-08-01 03:30:45 +00:00
amqp "github.com/rabbitmq/amqp091-go"
2025-07-31 02:34:17 +00:00
)
2025-08-01 03:30:45 +00:00
func initRabbitConsumer(rabbitURL, queueName string) (*amqp.Connection, *amqp.Channel, <-chan amqp.Delivery, error) {
conn, err := amqp.Dial(rabbitURL)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to connect to rabbitmq: %v", err)
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, nil, nil, fmt.Errorf("failed to open a channel: %v", err)
}
// 确保队列存在
_, err = ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
2025-07-31 02:34:17 +00:00
if err != nil {
2025-08-01 03:30:45 +00:00
conn.Close()
return nil, nil, nil, fmt.Errorf("failed to declare a queue: %v", err)
2025-07-31 02:34:17 +00:00
}
2025-08-01 03:30:45 +00:00
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
conn.Close()
return nil, nil, nil, fmt.Errorf("failed to register a consumer: %v", err)
}
return conn, ch, msgs, nil
2025-07-31 02:34:17 +00:00
}
2025-08-01 03:30:45 +00:00
func handleRabbitMsg(body []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) {
2025-07-31 02:34:17 +00:00
var data msg.ZmqMsg
2025-08-01 03:30:45 +00:00
if err := json.Unmarshal(body, &data); err != nil {
fmt.Println("[rabbitmq]: json unmarshal error:", err)
2025-07-31 02:34:17 +00:00
return
}
proxyListLock.Lock()
2025-08-01 03:30:45 +00:00
defer proxyListLock.Unlock()
2025-07-31 02:34:17 +00:00
if data.MethodID == 0 {
proxyList[data.ID] = data.Address
} else {
delete(proxyList, data.ID)
}
}
2025-08-01 03:30:45 +00:00
func StartZMQ(rabbitURL, queueName string, proxyList map[string]string, proxyListLock *sync.RWMutex) {
conn, ch, msgs, err := initRabbitConsumer(rabbitURL, queueName)
if err != nil {
fmt.Println("[rabbitmq]:", err)
return
}
defer conn.Close()
defer ch.Close()
fmt.Println("[rabbitmq]: waiting for messages...")
for d := range msgs {
// fmt.Printf("[rabbitmq] received: %s\n", d.Body)
handleRabbitMsg(d.Body, proxyList, proxyListLock)
2025-07-31 02:34:17 +00:00
}
}