proxy/internal/zmq/zmq.go

50 lines
1.0 KiB
Go

package zmq
import (
"encoding/json"
"fmt"
"proxy/internal/msg"
"sync"
"time"
"github.com/zeromq/goczmq"
)
func initZmqPull(sub_to string) *goczmq.Sock {
pull_ch, err := goczmq.NewPull(sub_to)
if err != nil {
fmt.Println("[zmq]:", err)
}
//pull_ch.SetMaxmsgsize(1024 * 1024 * 8)
return pull_ch
}
func handleZmqMsg(zmqMsg []byte, proxyList map[string]string, proxyListLock *sync.RWMutex) {
var data msg.ZmqMsg
if err := json.Unmarshal(zmqMsg, &data); err != nil {
fmt.Println("[zmq]:", err)
return
}
proxyListLock.Lock()
// MethodID: 0(add), 1(delete)
if data.MethodID == 0 {
proxyList[data.ID] = data.Address
} else {
delete(proxyList, data.ID)
}
proxyListLock.Unlock()
}
func StartZMQ(zmqAddr string, proxyList map[string]string, proxyListLock *sync.RWMutex) {
conn := initZmqPull(zmqAddr)
for {
zmqMsg, _, err := conn.RecvFrame()
if err != nil {
fmt.Println("[zmq recv]:", err)
time.Sleep(time.Second) // 防止CPU空转
continue
}
handleZmqMsg(zmqMsg, proxyList, proxyListLock)
}
}