package zmq // import ( // "encoding/json" // "fmt" // "proxy/internal/msg" // "sync" // "time" // "github.com/zeromq/goczmq" // ) // func initZmqPull(sub_to string) *goczmq.Sock { // pull_ch, err := goczmq.NewPull(sub_to) // if err != nil { // fmt.Println("[zmq]:", err) // } // //pull_ch.SetMaxmsgsize(1024 * 1024 * 8) // return pull_ch // } // func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) { // var data msg.ZmqMsg // if err := json.Unmarshal(zmqMsg, &data); err != nil { // fmt.Println("[zmq]:", err) // return // } // proxyListLock.Lock() // // MethodID: 0(add), 1(delete) // if data.MethodID == 0 { // proxyList[data.ID] = data.Address // } else { // delete(proxyList, data.ID) // } // proxyListLock.Unlock() // } // func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) { // conn := initZmqPull(zmqAddr) // for { // zmqMsg, _, err := conn.RecvFrame() // fmt.Println("[zmq]:", zmqMsg) // if err != nil { // fmt.Println("[zmq recv]:", err) // time.Sleep(time.Second) // 防止CPU空转 // continue // } // handleZmqMsg(zmqMsg, proxyList, proxyListLock) // } // } import ( "encoding/json" "fmt" "proxy/internal/msg" "sync" amqp "github.com/rabbitmq/amqp091-go" ) func initRabbitConsumer(rabbitURL, queueName string) (*amqp.Connection, *amqp.Channel, <-chan amqp.Delivery, error) { conn, err := amqp.Dial(rabbitURL) if err != nil { return nil, nil, nil, fmt.Errorf("failed to connect to rabbitmq: %v", err) } ch, err := conn.Channel() if err != nil { conn.Close() return nil, nil, nil, fmt.Errorf("failed to open a channel: %v", err) } // 确保队列存在 _, err = ch.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { conn.Close() return nil, nil, nil, fmt.Errorf("failed to declare a queue: %v", err) } msgs, err := ch.Consume( queueName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { conn.Close() return nil, nil, nil, fmt.Errorf("failed to register a consumer: %v", err) } return conn, ch, msgs, nil } func handleRabbitMsg(body []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) { var data msg.ZmqMsg if err := json.Unmarshal(body, &data); err != nil { fmt.Println("[rabbitmq]: json unmarshal error:", err) return } proxyListLock.Lock() defer proxyListLock.Unlock() if data.MethodID == 0 { proxyList[data.ID] = data.Address } else { delete(proxyList, data.ID) } } func StartZMQ(rabbitURL, queueName string, proxyList map[string]string, proxyListLock *sync.RWMutex) { conn, ch, msgs, err := initRabbitConsumer(rabbitURL, queueName) if err != nil { fmt.Println("[rabbitmq]:", err) return } defer conn.Close() defer ch.Close() fmt.Println("[rabbitmq]: waiting for messages...") for d := range msgs { // fmt.Printf("[rabbitmq] received: %s\n", d.Body) handleRabbitMsg(d.Body, proxyList, proxyListLock) } }