package server import ( "encoding/hex" "encoding/json" "fmt" "log" "m2pool-payment/internal/blockchain" "m2pool-payment/internal/blockchain/eth" "m2pool-payment/internal/crypto" "m2pool-payment/internal/db" "m2pool-payment/internal/logger" message "m2pool-payment/internal/msg" rmq "m2pool-payment/internal/queue" "os" "os/signal" "strings" "syscall" "time" ) const MSG_KEY string = "9f3c7a12" // 状态码常量 const ( STATUS_FAILED = 0 // 失败 STATUS_SUCCESS = 1 // 成功 STATUS_PENDING = 2 // 待确认 STATUS_VERIFY_FAILED = 3 // 验证失败 ) type ServerCtx struct { msgKey string Config message.Config blockChainServer *blockchain.BlockChainServer rmqServer *rmq.RabbitMQServer sqlitedb db.SQLite } var s_ctx ServerCtx // verifyMessage 验证消息签名 func verifyMessage(timestamp uint64, sign string) bool { hash_byte := crypto.Sha256Hash(fmt.Sprintf("%x", timestamp) + MSG_KEY) hash := hex.EncodeToString(hash_byte) return hash == sign } func loadConfig(msgKey string) { file, err := os.ReadFile("config.json") if err != nil { panic(fmt.Sprintf("读取配置文件失败: %v", err)) } err = json.Unmarshal(file, &s_ctx.Config) if err != nil { panic(fmt.Sprintf("解析配置文件失败: %v", err)) } log.Printf("✅ 配置加载成功: RPC=%s, WS=%s", s_ctx.Config.ETHConfig.RpcURL, s_ctx.Config.ETHConfig.WsURL) s_ctx.msgKey = msgKey } func initBlockChainServer() { // 初始化节点服务 node_server := blockchain.NewBlockChainServer() // 初始化ETH节点 eth_node, err := eth.NewETHNode(s_ctx.Config.ETHConfig, "m2pool") if err != nil { log.Fatalf("ETH-Node Start error: %v", err) } // 注册ETH节点 node_server.RegisterChain("ETH", eth_node) // 将所有注册的blockChainServer绑定至server s_ctx.blockChainServer = node_server log.Println("✅ 区块链服务初始化完成") } func loadSQLiteData() { err1 := loadTopupReqMsg() if err1 != nil { log.Fatalf("load topup msg err:%v", err1) } err2 := loadWithdrawReqMsg() if err2 != nil { log.Fatalf("load withdraw msg err:%v", err2) } err3 := loadPayReqMsg() if err3 != nil { log.Fatalf("load pay msg err:%v", err3) } } func loadTopupReqMsg() error { sql := `SELECT chain, symbol, timestamp, to_addr FROM msg_topup_req;` rows, err := s_ctx.sqlitedb.DB.Query(sql) if err != nil { return fmt.Errorf("query history topup-msg error: %w", err) } defer rows.Close() var topupReq_msg message.TopupMsg_req hasData := false for rows.Next() { hasData = true if err := rows.Scan(&topupReq_msg.Chain, &topupReq_msg.Symbol, &topupReq_msg.Timestamp, &topupReq_msg.Address); err != nil { return err } s_ctx.blockChainServer.AddAddress(topupReq_msg.Chain, topupReq_msg.Address, topupReq_msg) } if !hasData { log.Println("Msg_topup_req`s msg has not data, doesn`t need to load.") return nil } // 在遍历完所有数据后检查是否发生了错误 if err := rows.Err(); err != nil { log.Printf("Error encountered while iterating over rows: %v", err) } return nil } func loadWithdrawReqMsg() error { sql := `SELECT queueId, chain, symbol, timestamp, from_addr, to_addr, amount FROM msg_withdraw_req;` rows, err := s_ctx.sqlitedb.DB.Query(sql) if err != nil { return fmt.Errorf("query history withdraw-msg error: %w", err) } defer rows.Close() var withdrawReq_msg message.WithdrawMsg_req hasData := false for rows.Next() { hasData = true // var chain, symbol, to_addr string // var timestamp uint64 if err := rows.Scan(&withdrawReq_msg.QueueId, &withdrawReq_msg.Chain, &withdrawReq_msg.Symbol, &withdrawReq_msg.Timestamp, &withdrawReq_msg.FromAddress, &withdrawReq_msg.ToAddress, &withdrawReq_msg.Amount); err != nil { return err } s_ctx.blockChainServer.AddAddress(withdrawReq_msg.Chain, withdrawReq_msg.ToAddress, withdrawReq_msg) } if !hasData { log.Println("Msg_withdraw_req`s msg has not data, doesn`t need to load.") return nil } // 在遍历完所有数据后检查是否发生了错误 if err := rows.Err(); err != nil { log.Printf("Error encountered while iterating over rows: %v", err) } return nil } func loadPayReqMsg() error { sql := `SELECT queueId, chain, symbol, timestamp, from_addr, to_addr, amount, orderId FROM msg_pay_req;` rows, err := s_ctx.sqlitedb.DB.Query(sql) if err != nil { return fmt.Errorf("query history pay-msg error: %w", err) } defer rows.Close() var payReq_msg message.PayMsg_req hasData := false for rows.Next() { hasData = true if err := rows.Scan(&payReq_msg.QueueId, &payReq_msg.Chain, &payReq_msg.Symbol, &payReq_msg.Timestamp, &payReq_msg.FromAddress, &payReq_msg.ToAddress, &payReq_msg.Amount, &payReq_msg.OrderId); err != nil { return err } s_ctx.blockChainServer.AddAddress(payReq_msg.Chain, payReq_msg.ToAddress, payReq_msg) } if !hasData { log.Println("Msg_pay_req`s msg has not data, doesn`t need to load.") return nil } // 在遍历完所有数据后检查是否发生了错误 if err := rows.Err(); err != nil { log.Printf("Error encountered while iterating over rows: %v", err) } return nil } func initRmqServer() { // 初始化rmq服务 rmq_server, err := rmq.NewRabbitMQServer(s_ctx.Config.RMQConfig) if err != nil { log.Fatalf("RabbitMQ Server Start error: %v", err) } // 将rmq服务绑定至server s_ctx.rmqServer = rmq_server log.Printf("✅ RabbitMQ服务初始化完成: %s", s_ctx.Config.RMQConfig.SubAddr) } func initSQLite(sqlite3_file string) { // 初始化sqlite3数据库 sqlite3, err := db.NewSQLite(sqlite3_file) if err != nil { log.Fatalf("connect sqlite3 error:%v", err) return } sqlByte, err := os.ReadFile("../public/SQLite3.sql") if err != nil { log.Fatalf("open sql file error: %s", "../public/SQLite3.sql") } sqlite3.Exec_(string(sqlByte)) s_ctx.sqlitedb = *sqlite3 } func handleTopupMsg() { s_ctx.rmqServer.OnTopupMsg = func(msg message.TopupMsg_req) { msg.Address = strings.ToLower(msg.Address) // 验证签名 if !verifyMessage(msg.Timestamp, msg.Sign) { err := s_ctx.rmqServer.PublishTopupResp(message.TopupMsg_resp{ Address: msg.Address, Status: STATUS_VERIFY_FAILED, Chain: msg.Chain, Symbol: msg.Symbol, Amount: 0, TxHash: "", }) if err != nil { log.Printf("❌ 发布充值失败响应失败: %v", err) } return } // 添加监听地址 // go func() { err := s_ctx.blockChainServer.AddAddress(msg.Chain, msg.Address, msg) if err != nil { log.Printf("❌ 添加监听地址失败: %v", err) // 发送失败响应 err = s_ctx.rmqServer.PublishTopupResp(message.TopupMsg_resp{ Address: msg.Address, Status: STATUS_FAILED, Chain: msg.Chain, Symbol: msg.Symbol, Amount: 0, TxHash: "", }) if err != nil { log.Printf("❌ 发布充值失败响应失败: %v", err) } return } // }() // 将新增数据写入sqlite insert_sql := `INSERT OR REPLACE INTO msg_topup_req (chain, symbol, timestamp, to_addr) VALUES (?, ?, ?, ?)` data := []any{msg.Chain, msg.Symbol, msg.Timestamp, msg.Address} err = s_ctx.sqlitedb.Insert(insert_sql, data) if err != nil { log.Printf("❌ 插入 msg_req 失败: %v, data: %+v", err, data) } } } func handleWithdrawMsg() { s_ctx.rmqServer.OnWithdrawMsg = func(msg message.WithdrawMsg_req) { msg.FromAddress = strings.ToLower(msg.FromAddress) msg.ToAddress = strings.ToLower(msg.ToAddress) // 验证签名 if !verifyMessage(msg.Timestamp, msg.Sign) { err := s_ctx.rmqServer.PublishWithdrawResp(message.WithdrawMsg_resp{ QueueId: msg.QueueId, Status: STATUS_VERIFY_FAILED, Chain: msg.Chain, Symbol: msg.Symbol, Amount: 0, TxHash: "", }) if err != nil { log.Printf("❌ 发布提现失败响应失败: %v", err) } return } // 执行转账 err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg) if err != nil { log.Printf("❌ 提现转账失败: %v", err) // 发送失败响应 s_ctx.rmqServer.PublishWithdrawResp(message.WithdrawMsg_resp{ QueueId: msg.QueueId, Status: STATUS_FAILED, Amount: msg.Amount, Chain: msg.Chain, Symbol: msg.Symbol, TxHash: "", }) return // 转账失败时直接返回,不进入链上确认流程 } // go func() { err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg.ToAddress, msg) if err != nil { log.Printf("❌ 添加监听地址失败: %v", err) // 发送失败响应 s_ctx.rmqServer.PublishWithdrawResp(message.WithdrawMsg_resp{ QueueId: msg.QueueId, Status: STATUS_FAILED, Amount: msg.Amount, Chain: msg.Chain, Symbol: msg.Symbol, TxHash: "", }) return } // }() // 将新增数据写入sqlite insert_sql := `INSERT OR REPLACE INTO msg_withdraw_req (queueId, chain, symbol, timestamp, from_addr, to_addr, amount) VALUES (?, ?, ?, ?, ?, ?, ?)` data := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Timestamp, msg.FromAddress, msg.ToAddress, msg.Amount} err = s_ctx.sqlitedb.Insert(insert_sql, data) if err != nil { log.Printf("❌ 插入 withdraw_req 失败: %v, data: %+v", err, data) } } } func handlePayMsg() { s_ctx.rmqServer.OnPayMsg = func(msg message.PayMsg_req) { msg.FromAddress = strings.ToLower(msg.FromAddress) msg.ToAddress = strings.ToLower(msg.ToAddress) // 验证签名 if !verifyMessage(msg.Timestamp, msg.Sign) { err := s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ QueueId: msg.QueueId, Status: STATUS_VERIFY_FAILED, Amount: msg.Amount, Chain: msg.Chain, Symbol: msg.Symbol, OrderId: msg.OrderId, TxHash: "", }) if err != nil { log.Printf("❌ 发布支付失败响应失败: %v", err) } return } // 执行转账 err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg) if err != nil { log.Printf("❌ 支付转账失败: %v", err) // 发送失败响应 s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ QueueId: msg.QueueId, Status: STATUS_FAILED, Amount: msg.Amount, Chain: msg.Chain, Symbol: msg.Symbol, OrderId: msg.OrderId, TxHash: "", }) return // 转账失败时直接返回,不进入链上确认流程 } // go func() { err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg.ToAddress, msg) if err != nil { log.Printf("❌ 添加监听地址失败: %v", err) // 发送失败响应 s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ QueueId: msg.QueueId, Status: STATUS_FAILED, Amount: msg.Amount, Chain: msg.Chain, Symbol: msg.Symbol, OrderId: msg.OrderId, TxHash: "", }) return } // }() // 将新增数据写入sqlite insert_sql := `INSERT OR REPLACE INTO msg_pay_req (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?)` data := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Timestamp, msg.FromAddress, msg.ToAddress, msg.Amount, msg.OrderId} err = s_ctx.sqlitedb.Insert(insert_sql, data) if err != nil { log.Printf("❌ 插入 pay_req 失败: %v, data: %+v", err, data) } } } func initRmqListen() { // ================== 设置 RabbitMQ 消息处理回调 ================== // 先设置所有回调(同步执行,避免竞态) handleTopupMsg() handleWithdrawMsg() handlePayMsg() // 回调设置完成后,再启动 RabbitMQ 监听 if err := s_ctx.rmqServer.Start(); err != nil { log.Fatalf("启动 RabbitMQ 监听失败: %v", err) } log.Println("✅ RabbitMQ 监听启动完成") } func handleChainEvent(chainEventCh chan any) { for event := range chainEventCh { // 添加 panic 恢复,防止单个消息处理错误导致整个 goroutine 退出 func() { defer func() { if r := recover(); r != nil { log.Printf("❌ 处理链上事件 panic: %v, event: %+v", r, event) } }() // 根据消息类型发送不同的响应 switch msg := event.(type) { case message.TopupMsg_resp: // 充值确认 if msg.Status == STATUS_PENDING { log.Printf("📨 [链上] 充值待确认: Address=%s, Amount=%.2f, TxHash=%s", msg.Address, msg.Amount, msg.TxHash) // 记录交易日志:待确认 logger.LogTopup(msg.Address, "待确认", msg.Amount, msg.TxHash, msg.BlockHeight) } else { log.Printf("✅ [链上] 充值确认: Address=%s, Amount=%.2f, TxHash=%s, Status=%d", msg.Address, msg.Amount, msg.TxHash, msg.Status) // 记录交易日志:已确认 logger.LogTopup(msg.Address, "确认", msg.Amount, msg.TxHash, msg.BlockHeight) } err := s_ctx.rmqServer.PublishTopupResp(msg) if err != nil { log.Printf("❌ 发送充值响应失败: %v", err) return } go func() { // 插入响应数据 sql := `INSERT INTO msg_topup_resp (chain, symbol, timestamp, to_addr, amount, height, txHash, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)` data := []any{msg.Chain, msg.Symbol, time.Now().Unix(), msg.Address, msg.Amount, msg.BlockHeight, msg.TxHash, msg.Status} err := s_ctx.sqlitedb.Insert(sql, data) if err != nil { log.Printf("❌ 插入 topup_resp 失败: %v", err) return } }() case message.WithdrawMsg_resp: // 提现确认 log.Printf("✅ [链上] 提现确认: QueueId=%s, Amount=%.2f, TxHash=%s, Status=%d", msg.QueueId, msg.Amount, msg.TxHash, msg.Status) // 记录交易日志 logger.LogWithdraw(msg.ToAddress, "确认", msg.Amount, msg.FromAddress, msg.TxHash, msg.BlockHeight) err := s_ctx.rmqServer.PublishWithdrawResp(msg) if err != nil { log.Printf("❌ 发送提现响应失败: %v", err) return } go func() { // 插入响应数据 sql := `INSERT INTO msg_withdraw_resp (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, height, txHash, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` data := []any{msg.QueueId, msg.Chain, msg.Symbol, time.Now().Unix(), msg.FromAddress, msg.ToAddress, msg.Amount, msg.BlockHeight, msg.TxHash, msg.Status} err := s_ctx.sqlitedb.Insert(sql, data) if err != nil { log.Printf("❌ 插入 withdraw_resp 失败: %v", err) return } // 删除对应数据 del_sql := `DELETE FROM msg_withdraw_req WHERE queueId = ?;` count, err := s_ctx.sqlitedb.Delete(del_sql, msg.QueueId) if err != nil { log.Printf("❌ 清理 withdraw_req 失败: %v, queueId=%s", err, msg.QueueId) } else if count == 0 { log.Printf("⚠️ 未找到要删除的 withdraw_req 记录: queueId=%s", msg.QueueId) } else { log.Printf("✅ 清理 withdraw_req 成功: 删除了 %d 条记录, queueId=%s", count, msg.QueueId) } }() case message.PayMsg_resp: // 支付确认 log.Printf("✅ [链上] 支付确认: QueueId=%s, OrderId=%s, Amount=%.2f, TxHash=%s, Status=%d", msg.QueueId, msg.OrderId, msg.Amount, msg.TxHash, msg.Status) // 记录交易日志 logger.LogPay(msg.ToAddress, "确认", msg.Amount, msg.FromAddress, msg.TxHash, msg.BlockHeight, msg.OrderId, msg.QueueId) err := s_ctx.rmqServer.PublishPayResp(msg) if err != nil { log.Printf("❌ 发送支付响应失败: %v", err) return } go func() { // 插入响应数据 sql := `INSERT INTO msg_pay_resp (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, height, txHash, status, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` data := []any{msg.QueueId, msg.Chain, msg.Symbol, time.Now().Unix(), msg.FromAddress, msg.ToAddress, msg.Amount, msg.BlockHeight, msg.TxHash, msg.Status, msg.OrderId} err := s_ctx.sqlitedb.Insert(sql, data) if err != nil { log.Printf("❌ 插入 pay_resp 失败: %v", err) return } // 删除对应数据 del_sql := `DELETE FROM msg_pay_req WHERE queueId = ?;` count, err := s_ctx.sqlitedb.Delete(del_sql, msg.QueueId) if err != nil { log.Printf("❌ 清理 pay_req 失败: %v, queueId=%s", err, msg.QueueId) } else if count == 0 { log.Printf("⚠️ 未找到要删除的 pay_req 记录: queueId=%s", msg.QueueId) } else { log.Printf("✅ 清理 pay_req 成功: 删除了 %d 条记录, queueId=%s", count, msg.QueueId) } }() default: log.Printf("⚠️ 未知消息类型: %T", event) } }() } } func Start(msgKey string) { log.Println("========================================") log.Println("🚀 M2Pool Payment System Starting...") log.Println("========================================") // 加载配置 loadConfig(msgKey) // 初始化交易日志系统 if err := logger.InitTransactionLogger("logs"); err != nil { log.Fatalf("❌ 初始化交易日志系统失败: %v", err) } log.Println("✅ 交易日志系统初始化完成") // ================== 初始化区块链节点 ================== initBlockChainServer() // ================== 初始化SQLite3 ================== initSQLite(s_ctx.Config.SQLite3.MsgPath) // 读取历史信息 loadSQLiteData() // ================== 初始化 RabbitMQ 服务 ================== initRmqServer() // ================== 启动链上事件监听通道 ================== chainEventCh := make(chan any, 1000) // 增加缓冲区,避免高并发丢消息 go s_ctx.blockChainServer.Listen("ETH", "USDT", chainEventCh) // ================== 启动 RabbitMQ 监听 ================== initRmqListen() // ================== 处理链上确认事件 ================== go handleChainEvent(chainEventCh) log.Println("========================================") log.Println("🎉 所有服务启动完成!") log.Println("========================================") // ================== 等待退出信号 ================== sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh // 优雅关闭 log.Println("========================================") log.Println("🛑 收到退出信号,正在关闭服务...") log.Println("========================================") s_ctx.blockChainServer.Stop("ETH") s_ctx.rmqServer.Close() logger.CloseTransactionLogger() log.Println("👋 服务已全部关闭") }