package rmq import ( "context" "encoding/json" "fmt" "log" message "m2pool-payment/internal/msg" "sync" "time" amqp "github.com/rabbitmq/amqp091-go" ) // RabbitMQServer RabbitMQ 服务 type RabbitMQServer struct { config message.RmqConfig conn *amqp.Connection channel *amqp.Channel mu sync.Mutex ctx context.Context cancel context.CancelFunc // 消息处理回调函数 OnTopupMsg func(message.TopupMsg_req) // 充值请求回调 OnWithdrawMsg func(message.WithdrawMsg_req) // 提现请求回调 OnPayMsg func(message.PayMsg_req) // 支付请求回调 OnRemoveMsg func(message.RemoveListenMsg_req) // 删除充值监听回调 } // NewRabbitMQServer 创建 RabbitMQ 服务 func NewRabbitMQServer(config message.RmqConfig) (*RabbitMQServer, error) { // 创建连接 conn, err := amqp.Dial(config.SubAddr) if err != nil { return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) } // 创建通道 channel, err := conn.Channel() if err != nil { conn.Close() return nil, fmt.Errorf("failed to open a channel: %w", err) } // 创建可取消的 context ctx, cancel := context.WithCancel(context.Background()) server := &RabbitMQServer{ config: config, conn: conn, channel: channel, ctx: ctx, cancel: cancel, } // 初始化队列和交换机 if err := server.setupQueuesAndExchanges(); err != nil { server.Close() return nil, fmt.Errorf("failed to setup queues: %w", err) } log.Println("✅ RabbitMQ队列已启动") return server, nil } // setupQueuesAndExchanges 设置队列和交换机 func (r *RabbitMQServer) setupQueuesAndExchanges() error { configs := []message.Queue{ r.config.Pay, r.config.Topup, r.config.Withdraw, r.config.Remove, r.config.PayResp, r.config.TopupResp, r.config.WithdrawResp, r.config.RemoveResp, } for _, cfg := range configs { // 声明交换机 err := r.channel.ExchangeDeclare( cfg.Exchange, // 交换机名称 "direct", // 类型:direct(与现有交换机类型一致) true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare exchange %s: %w", cfg.Exchange, err) } // 声明队列 _, err = r.channel.QueueDeclare( cfg.Exchange, // 队列名称 true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare queue %s: %w", cfg.Exchange, err) } // 绑定队列到交换机 for _, routingKey := range cfg.Routing { err = r.channel.QueueBind( cfg.Queue, // 队列名称 routingKey, // routing key cfg.Exchange, // 交换机名称 false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to bind queue %s to exchange %s with key %s: %w", cfg.Queue, cfg.Exchange, routingKey, err) } } // log.Printf("✅ 队列配置成功: Queue=%s, Exchange=%s, RoutingKeys=%v", // cfg.QueueName, cfg.ExchangeName, cfg.Routing) } return nil } // Start 启动监听所有队列 func (r *RabbitMQServer) Start() error { // 启动充值消息监听 go r.consumeTopup() // 启动提现消息监听 go r.consumeWithdraw() // 启动支付消息监听 go r.consumePay() // 启动删除充值监听 go r.consumeRemove() // log.Println("🚀 RabbitMQ 服务启动成功,开始监听消息...") return nil } // consumeTopup 消费充值消息 func (r *RabbitMQServer) consumeTopup() { r.consumeQueue( r.config.Topup.Queue, func(body []byte) error { var msg message.TopupMsg_req if err := json.Unmarshal(body, &msg); err != nil { return fmt.Errorf("failed to parse topup message: %w", err) } log.Printf("📥 [RMQ] 收到充值请求: Chain=%s, Symbol=%s, Address=%s", msg.Chain, msg.Symbol, msg.Address) if r.OnTopupMsg != nil { r.OnTopupMsg(msg) } return nil }, ) } // consumeWithdraw 消费提现消息 func (r *RabbitMQServer) consumeWithdraw() { r.consumeQueue( r.config.Withdraw.Queue, func(body []byte) error { var msg message.WithdrawMsg_req if err := json.Unmarshal(body, &msg); err != nil { return fmt.Errorf("failed to parse withdraw message: %w", err) } log.Printf("📥 [RMQ] 收到提现请求: QueueId=%s, From=%s, To=%s, Amount=%.2f %s", msg.QueueId, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Symbol) if r.OnWithdrawMsg != nil { r.OnWithdrawMsg(msg) } return nil }, ) } // consumePay 消费支付消息 func (r *RabbitMQServer) consumePay() { r.consumeQueue( r.config.Pay.Queue, func(body []byte) error { var msg message.PayMsg_req if err := json.Unmarshal(body, &msg); err != nil { return fmt.Errorf("failed to parse pay message: %w", err) } log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, From=%s, Chain=%s, Symbol=%s, TxCount=%d", msg.QueueId, msg.FromAddress, msg.Chain, msg.Symbol, len(msg.Transactions)) if r.OnPayMsg != nil { r.OnPayMsg(msg) } return nil }, ) } // consumeRemove 消费删除充值监听消息 func (r *RabbitMQServer) consumeRemove() { r.consumeQueue( r.config.Remove.Queue, func(body []byte) error { var msg message.RemoveListenMsg_req if err := json.Unmarshal(body, &msg); err != nil { return fmt.Errorf("failed to parse remove message: %w", err) } log.Printf("📥 [RMQ] 收到删除充值监听: Chain=%s, Symbol=%s, Address=%s", msg.Chain, msg.Symbol, msg.Address) if r.OnRemoveMsg != nil { r.OnRemoveMsg(msg) } return nil }, ) } // consumeQueue 通用队列消费方法 func (r *RabbitMQServer) consumeQueue(queueName string, handler func([]byte) error) { for { select { case <-r.ctx.Done(): // log.Printf("🛑 停止监听队列: %s", queueName) return default: msgs, err := r.channel.Consume( queueName, // 队列名称 "", // consumer tag false, // auto-ack (设置为false,手动确认) false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { // log.Printf("❌ 消费队列 %s 失败: %v, 3秒后重试...", queueName, err) time.Sleep(3 * time.Second) continue } // log.Printf("✅ 开始监听队列: %s (%s)", queueName, msgType) for msg := range msgs { err := handler(msg.Body) if err != nil { // log.Printf("⚠️ 处理 %s 消息失败: %v", msgType, err) // 消息处理失败,不确认,让消息重新入队 msg.Nack(false, true) } else { // 消息处理成功,确认消息 msg.Ack(false) } } // 如果 channel 关闭,等待后重连 // log.Printf("⚠️ 队列 %s 连接断开,3秒后重连...", queueName) time.Sleep(3 * time.Second) } } } // PublishTopupResp 发布充值响应 func (r *RabbitMQServer) PublishTopupResp(resp message.TopupMsg_resp) error { return r.publishMessage( r.config.TopupResp, resp, fmt.Sprintf("充值响应: Address=%s, Status=%d, TxHash=%s", resp.Address, resp.Status, resp.TxHash), ) } // PublishWithdrawResp 发布提现响应 func (r *RabbitMQServer) PublishWithdrawResp(resp message.WithdrawMsg_resp) error { return r.publishMessage( r.config.WithdrawResp, resp, fmt.Sprintf("提现响应: QueueId=%s, Status=%d, TxHash=%s", resp.QueueId, resp.Status, resp.TxHash), ) } // PublishPayResp 发布支付响应 func (r *RabbitMQServer) PublishPayResp(resp message.PayMsg_resp) error { return r.publishMessage( r.config.PayResp, resp, "支付响应", ) } // PublishRemoveResp 发布删除充值监听响应 func (r *RabbitMQServer) PublishRemoveResp(resp message.RemoveListenMsg_resp) error { return r.publishMessage( r.config.RemoveResp, resp, fmt.Sprintf("删除充值监听响应: Address=%s, Status=%d", resp.Address, resp.Status), ) } // publishMessage 通用消息发布方法 func (r *RabbitMQServer) publishMessage(config message.Queue, msg interface{}, logMsg string) error { r.mu.Lock() defer r.mu.Unlock() // 序列化消息 body, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } // 使用第一个 routing key(如果有多个) routingKey := "" if len(config.Routing) > 0 { routingKey = config.Routing[0] } // 发布消息 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err = r.channel.PublishWithContext( ctx, config.Exchange, // 交换机 routingKey, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, DeliveryMode: amqp.Persistent, // 持久化消息 Timestamp: time.Now(), }, ) if err != nil { return fmt.Errorf("failed to publish message: %w", err) } log.Printf("📤 [RMQ] 发送%s", logMsg) return nil } // Close 关闭连接 func (r *RabbitMQServer) Close() error { // log.Println("🛑 正在关闭 RabbitMQ 服务...") r.cancel() // 取消所有 goroutine if r.channel != nil { if err := r.channel.Close(); err != nil { // log.Printf("⚠️ 关闭 channel 失败: %v", err) } } if r.conn != nil { if err := r.conn.Close(); err != nil { // log.Printf("⚠️ 关闭连接失败: %v", err) } } // log.Println("✅ RabbitMQ 服务已关闭") return nil } // IsConnected 检查连接状态 func (r *RabbitMQServer) IsConnected() bool { return r.conn != nil && !r.conn.IsClosed() }