Files
m2pool_payment/internal/queue/rabbitmq.go
2025-10-16 18:54:27 +08:00

345 lines
8.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rmq
import (
"context"
"encoding/json"
"fmt"
"log"
message "m2pool-payment/internal/msg"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// RabbitMQServer RabbitMQ 服务
type RabbitMQServer struct {
config message.RMQConfig
conn *amqp.Connection
channel *amqp.Channel
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
// 消息处理回调函数
OnTopupMsg func(message.TopupMsg_req) // 充值请求回调
OnWithdrawMsg func(message.WithdrawMsg_req) // 提现请求回调
OnPayMsg func(message.PayMsg_req) // 支付请求回调
}
// NewRabbitMQServer 创建 RabbitMQ 服务
func NewRabbitMQServer(config message.RMQConfig) (*RabbitMQServer, error) {
// 创建连接
conn, err := amqp.Dial(config.SubAddr)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
// 创建通道
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open a channel: %w", err)
}
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
server := &RabbitMQServer{
config: config,
conn: conn,
channel: channel,
ctx: ctx,
cancel: cancel,
}
// 初始化队列和交换机
if err := server.setupQueuesAndExchanges(); err != nil {
server.Close()
return nil, fmt.Errorf("failed to setup queues: %w", err)
}
return server, nil
}
// setupQueuesAndExchanges 设置队列和交换机
func (r *RabbitMQServer) setupQueuesAndExchanges() error {
configs := []message.QueueConfig{
r.config.PayConfig,
r.config.TopUpConfig,
r.config.WithdrawConfig,
r.config.PayRespConfig,
r.config.TopUpRespConfig,
r.config.WithdrawRespConfig,
}
for _, cfg := range configs {
// 声明交换机
err := r.channel.ExchangeDeclare(
cfg.ExchangeName, // 交换机名称
"direct", // 类型direct与现有交换机类型一致
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare exchange %s: %w", cfg.ExchangeName, err)
}
// 声明队列
_, err = r.channel.QueueDeclare(
cfg.QueueName, // 队列名称
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue %s: %w", cfg.QueueName, err)
}
// 绑定队列到交换机
for _, routingKey := range cfg.Routing {
err = r.channel.QueueBind(
cfg.QueueName, // 队列名称
routingKey, // routing key
cfg.ExchangeName, // 交换机名称
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to bind queue %s to exchange %s with key %s: %w",
cfg.QueueName, cfg.ExchangeName, routingKey, err)
}
}
// log.Printf("✅ 队列配置成功: Queue=%s, Exchange=%s, RoutingKeys=%v",
// cfg.QueueName, cfg.ExchangeName, cfg.Routing)
}
return nil
}
// Start 启动监听所有队列
func (r *RabbitMQServer) Start() error {
// 启动充值消息监听
go r.consumeTopup()
// 启动提现消息监听
go r.consumeWithdraw()
// 启动支付消息监听
go r.consumePay()
// log.Println("🚀 RabbitMQ 服务启动成功,开始监听消息...")
return nil
}
// consumeTopup 消费充值消息
func (r *RabbitMQServer) consumeTopup() {
r.consumeQueue(
r.config.TopUpConfig.QueueName,
"topup",
func(body []byte) error {
var msg message.TopupMsg_req
if err := json.Unmarshal(body, &msg); err != nil {
return fmt.Errorf("failed to parse topup message: %w", err)
}
log.Printf("📥 [RMQ] 收到充值请求: Chain=%s, Symbol=%s, Address=%s",
msg.Chain, msg.Symbol, msg.Address)
if r.OnTopupMsg != nil {
r.OnTopupMsg(msg)
}
return nil
},
)
}
// consumeWithdraw 消费提现消息
func (r *RabbitMQServer) consumeWithdraw() {
r.consumeQueue(
r.config.WithdrawConfig.QueueName,
"withdraw",
func(body []byte) error {
var msg message.WithdrawMsg_req
if err := json.Unmarshal(body, &msg); err != nil {
return fmt.Errorf("failed to parse withdraw message: %w", err)
}
log.Printf("📥 [RMQ] 收到提现请求: QueueId=%s, From=%s, To=%s, Amount=%.2f %s",
msg.QueueId, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Symbol)
if r.OnWithdrawMsg != nil {
r.OnWithdrawMsg(msg)
}
return nil
},
)
}
// consumePay 消费支付消息
func (r *RabbitMQServer) consumePay() {
r.consumeQueue(
r.config.PayConfig.QueueName,
"pay",
func(body []byte) error {
var msg message.PayMsg_req
if err := json.Unmarshal(body, &msg); err != nil {
return fmt.Errorf("failed to parse pay message: %w", err)
}
log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, OrderId=%s, From=%s, To=%s, Amount=%.2f %s",
msg.QueueId, msg.OrderId, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Symbol)
if r.OnPayMsg != nil {
r.OnPayMsg(msg)
}
return nil
},
)
}
// consumeQueue 通用队列消费方法
func (r *RabbitMQServer) consumeQueue(queueName, msgType string, handler func([]byte) error) {
for {
select {
case <-r.ctx.Done():
// log.Printf("🛑 停止监听队列: %s", queueName)
return
default:
msgs, err := r.channel.Consume(
queueName, // 队列名称
"", // consumer tag
false, // auto-ack (设置为false手动确认)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
// log.Printf("❌ 消费队列 %s 失败: %v, 3秒后重试...", queueName, err)
time.Sleep(3 * time.Second)
continue
}
// log.Printf("✅ 开始监听队列: %s (%s)", queueName, msgType)
for msg := range msgs {
err := handler(msg.Body)
if err != nil {
// log.Printf("⚠️ 处理 %s 消息失败: %v", msgType, err)
// 消息处理失败,不确认,让消息重新入队
msg.Nack(false, true)
} else {
// 消息处理成功,确认消息
msg.Ack(false)
}
}
// 如果 channel 关闭,等待后重连
// log.Printf("⚠️ 队列 %s 连接断开3秒后重连...", queueName)
time.Sleep(3 * time.Second)
}
}
}
// PublishTopupResp 发布充值响应
func (r *RabbitMQServer) PublishTopupResp(resp message.TopupMsg_resp) error {
return r.publishMessage(
r.config.TopUpRespConfig,
resp,
fmt.Sprintf("充值响应: Address=%s, Status=%d, TxHash=%s",
resp.Address, resp.Status, resp.TxHash),
)
}
// PublishWithdrawResp 发布提现响应
func (r *RabbitMQServer) PublishWithdrawResp(resp message.WithdrawMsg_resp) error {
return r.publishMessage(
r.config.WithdrawRespConfig,
resp,
fmt.Sprintf("提现响应: QueueId=%s, Status=%d, TxHash=%s",
resp.QueueId, resp.Status, resp.TxHash),
)
}
// PublishPayResp 发布支付响应
func (r *RabbitMQServer) PublishPayResp(resp message.PayMsg_resp) error {
return r.publishMessage(
r.config.PayRespConfig,
resp,
fmt.Sprintf("支付响应: QueueId=%s, OrderId=%s, Status=%d, TxHash=%s",
resp.QueueId, resp.OrderId, resp.Status, resp.TxHash),
)
}
// publishMessage 通用消息发布方法
func (r *RabbitMQServer) publishMessage(config message.QueueConfig, msg interface{}, logMsg string) error {
r.mu.Lock()
defer r.mu.Unlock()
// 序列化消息
body, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
// 使用第一个 routing key如果有多个
routingKey := ""
if len(config.Routing) > 0 {
routingKey = config.Routing[0]
}
// 发布消息
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = r.channel.PublishWithContext(
ctx,
config.ExchangeName, // 交换机
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // 持久化消息
Timestamp: time.Now(),
},
)
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}
log.Printf("📤 [RMQ] 发送%s", logMsg)
return nil
}
// Close 关闭连接
func (r *RabbitMQServer) Close() error {
// log.Println("🛑 正在关闭 RabbitMQ 服务...")
r.cancel() // 取消所有 goroutine
if r.channel != nil {
if err := r.channel.Close(); err != nil {
// log.Printf("⚠️ 关闭 channel 失败: %v", err)
}
}
if r.conn != nil {
if err := r.conn.Close(); err != nil {
// log.Printf("⚠️ 关闭连接失败: %v", err)
}
}
// log.Println("✅ RabbitMQ 服务已关闭")
return nil
}
// IsConnected 检查连接状态
func (r *RabbitMQServer) IsConnected() bool {
return r.conn != nil && !r.conn.IsClosed()
}