From b7c84fd1014df7d493849c91907c0ab222f9ce3d Mon Sep 17 00:00:00 2001 From: lzx <393768033@qq.com> Date: Tue, 21 Oct 2025 14:25:15 +0800 Subject: [PATCH] update: add log-system --- bin/config.json | 76 ++++++++ internal/blockchain/eth/eth.go | 58 +++--- internal/logger/README.md | 41 ++++ internal/logger/transaction_logger.go | 269 ++++++++++++++++++++++++++ internal/msg/msg.go | 45 +++-- internal/server.go | 18 ++ 6 files changed, 463 insertions(+), 44 deletions(-) create mode 100644 bin/config.json create mode 100644 internal/logger/README.md create mode 100644 internal/logger/transaction_logger.go diff --git a/bin/config.json b/bin/config.json new file mode 100644 index 0000000..c9b55ce --- /dev/null +++ b/bin/config.json @@ -0,0 +1,76 @@ +{ + "rmq_config": { + "sub_addr": "amqp://m2pool:m2pool@localhost:5672", + "pay": { + "queue": "pay.auto.queue", + "exchange": "pay.exchange", + "routing": [ + "pay.auto.routing.key" + ] + }, + "topup": { + "queue": "pay.recharge.queue", + "exchange": "pay.exchange", + "routing": [ + "pay.recharge.routing.key" + ] + }, + "withdraw": { + "queue": "pay.withdraw.queue", + "exchange": "pay.exchange", + "routing": [ + "pay.withdraw.routing.key" + ] + }, + "pay_resp": { + "queue": "pay.auto.return.queue", + "exchange": "pay.exchange", + "routing": [ + "pay.auto.return.routing.key" + ] + }, + "topup_resp": { + "queue": "pay.recharge.return.queue", + "exchange": "pay.exchange", + "routing": [ + "pay.recharge.return.routing.key" + ] + }, + "withdraw_resp": { + "queue": "pay.withdraw.return.queue", + "exchange": "pay.exchange", + "routing": [ + "pay.withdraw.return.routing.key" + ] + } + }, + "eth_config": { + "rpcUrl": "http://10.168.3.236:18545", + "wsUrl": "ws://10.168.3.236:18546", + "confirmHeight": 20, + "dbConfig": { + "user": "root", + "password": "Lzx2021@!", + "host": "127.0.0.1", + "port": 3306, + "database": "payment", + "maxOpenConns": 20, + "maxIdleCoons": 20, + "connMaxLife": 120 + } + }, + "tron_config": { + "rpcUrl": "", + "confirmHeight": 20, + "dbConfig": { + "user": "root", + "password": "Lzx2021@!", + "host": "127.0.0.1", + "port": 3306, + "database": "payment", + "maxOpenConns": 20, + "maxIdleCoons": 20, + "connMaxLife": 120 + } + } +} \ No newline at end of file diff --git a/internal/blockchain/eth/eth.go b/internal/blockchain/eth/eth.go index 0271a8b..c367c53 100644 --- a/internal/blockchain/eth/eth.go +++ b/internal/blockchain/eth/eth.go @@ -341,12 +341,13 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log, ch chan any) { d1, ok := v.(message.TopupMsg_req) if ok && strings.ToLower(d1.Address) == toAddr { pendingMsg := message.TopupMsg_resp{ - Address: toAddr, - Status: 2, // 待确认状态 - Chain: d1.Chain, - Symbol: d1.Symbol, - Amount: utils.BigIntUSDTToFloat64(transferEvent.Value), - TxHash: tx_hash, + Address: toAddr, + Status: 2, // 待确认状态 + Chain: d1.Chain, + Symbol: d1.Symbol, + Amount: utils.BigIntUSDTToFloat64(transferEvent.Value), + TxHash: tx_hash, + BlockHeight: height, } // log.Printf("📤 发送待确认充值消息: TxHash=%s, Address=%s, Amount=%.2f", // tx_hash, toAddr, pendingMsg.Amount) @@ -487,12 +488,13 @@ func (e *ETHNode) confirm_usdt(tx message.Tx_msg, height uint64, ch chan any) { // 统一转小写比较 if strings.ToLower(d1.Address) == tx.Tx.To { result_msg = message.TopupMsg_resp{ - Address: tx.Tx.To, - Status: tx.Tx.Status, - Chain: d1.Chain, - Symbol: d1.Symbol, - Amount: tx.Tx.Value, - TxHash: tx.Tx.TxHash, + Address: tx.Tx.To, + Status: tx.Tx.Status, + Chain: d1.Chain, + Symbol: d1.Symbol, + Amount: tx.Tx.Value, + TxHash: tx.Tx.TxHash, + BlockHeight: tx.Tx.Height, } // 充值消息不删除,可能会有多笔充值到同一地址 break @@ -506,12 +508,15 @@ func (e *ETHNode) confirm_usdt(tx message.Tx_msg, height uint64, ch chan any) { strings.ToLower(d2.ToAddress) == tx.Tx.To && d2.Amount == tx.Tx.Value { result_msg = message.WithdrawMsg_resp{ - QueueId: d2.QueueId, - Status: tx.Tx.Status, - Amount: tx.Tx.Value, - Chain: d2.Chain, - Symbol: d2.Symbol, - TxHash: tx.Tx.TxHash, + QueueId: d2.QueueId, + Status: tx.Tx.Status, + Amount: tx.Tx.Value, + Chain: d2.Chain, + Symbol: d2.Symbol, + TxHash: tx.Tx.TxHash, + FromAddress: tx.Tx.From, + ToAddress: tx.Tx.To, + BlockHeight: tx.Tx.Height, } matchIndex = i // 记录索引,稍后删除 break @@ -525,13 +530,16 @@ func (e *ETHNode) confirm_usdt(tx message.Tx_msg, height uint64, ch chan any) { strings.ToLower(d3.ToAddress) == tx.Tx.To && d3.Amount == tx.Tx.Value { result_msg = message.PayMsg_resp{ - QueueId: d3.QueueId, - Status: tx.Tx.Status, - Amount: tx.Tx.Value, - Chain: d3.Chain, - Symbol: d3.Symbol, - OrderId: d3.OrderId, - TxHash: tx.Tx.TxHash, + QueueId: d3.QueueId, + Status: tx.Tx.Status, + Amount: tx.Tx.Value, + Chain: d3.Chain, + Symbol: d3.Symbol, + OrderId: d3.OrderId, + TxHash: tx.Tx.TxHash, + FromAddress: tx.Tx.From, + ToAddress: tx.Tx.To, + BlockHeight: tx.Tx.Height, } matchIndex = i // 记录索引,稍后删除 break diff --git a/internal/logger/README.md b/internal/logger/README.md new file mode 100644 index 0000000..7561b02 --- /dev/null +++ b/internal/logger/README.md @@ -0,0 +1,41 @@ +# 交易日志系统 + +## 核心功能 + +1. **按地址/订单分离日志** + - 充值:按 `address` 命名 + - 提现:按 `queueId` 命名 + - 支付:按 `orderId` 命名 + +2. **自动日志轮转** + - 单文件超过 1MB 自动压缩为 `.log.gz` + - 后台异步压缩,不影响性能 + +3. **日志格式** + ``` + 2024-01-01 12:00:00 [topup]-[待确认] | 金额: 100.000000 | 交易哈希: 0x123... | 区块高度: 12345678 | 地址: 0xabc... + ``` + +## 使用示例 + +```go +// 初始化 +logger.InitTransactionLogger("logs") +defer logger.CloseTransactionLogger() + +// 记录日志 +logger.LogTopup(address, "待确认", amount, txHash, blockHeight) +logger.LogWithdraw(queueId, "确认", amount, from, to, txHash, blockHeight) +logger.LogPay(orderId, queueId, "确认", amount, from, to, txHash, blockHeight) +``` + +## 配置 + +修改 `internal/logger/transaction_logger.go` 常量: +```go +const ( + MaxFileSize = 1 * 1024 * 1024 // 日志轮转大小 + LogDir = "logs" // 日志目录 +) +``` + diff --git a/internal/logger/transaction_logger.go b/internal/logger/transaction_logger.go new file mode 100644 index 0000000..74300bf --- /dev/null +++ b/internal/logger/transaction_logger.go @@ -0,0 +1,269 @@ +package logger + +import ( + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" +) + +const ( + MaxFileSize = 1 * 1024 * 1024 // 1MB + LogDir = "logs" +) + +// TransactionLogger 交易日志记录器 +type TransactionLogger struct { + mu sync.Mutex + files map[string]*logFile // address -> logFile + logDir string +} + +// logFile 单个日志文件 +type logFile struct { + file *os.File + size int64 + address string + logDir string + mu sync.Mutex +} + +var ( + txLogger *TransactionLogger + once sync.Once +) + +// InitTransactionLogger 初始化交易日志系统 +func InitTransactionLogger(logDir string) error { + var err error + once.Do(func() { + if logDir == "" { + logDir = LogDir + } + + // 创建日志目录 + err = os.MkdirAll(logDir, 0755) + if err != nil { + return + } + + txLogger = &TransactionLogger{ + files: make(map[string]*logFile), + logDir: logDir, + } + }) + + return err +} + +// getOrCreateLogFile 获取或创建日志文件 +func (tl *TransactionLogger) getOrCreateLogFile(address string) (*logFile, error) { + tl.mu.Lock() + defer tl.mu.Unlock() + + // 如果已存在,返回现有的 + if lf, exists := tl.files[address]; exists { + return lf, nil + } + + // 创建新的日志文件 + lf := &logFile{ + address: address, + logDir: tl.logDir, + } + + // 打开或创建文件 + filePath := filepath.Join(tl.logDir, fmt.Sprintf("%s.log", address)) + file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, fmt.Errorf("创建日志文件失败: %w", err) + } + + // 获取当前文件大小 + info, err := file.Stat() + if err != nil { + file.Close() + return nil, fmt.Errorf("获取文件信息失败: %w", err) + } + + lf.file = file + lf.size = info.Size() + tl.files[address] = lf + + return lf, nil +} + +// write 写入日志 +func (lf *logFile) write(content string) error { + lf.mu.Lock() + defer lf.mu.Unlock() + + // 检查是否需要轮转 + if lf.size >= MaxFileSize { + if err := lf.rotate(); err != nil { + return fmt.Errorf("日志轮转失败: %w", err) + } + } + + // 写入内容 + n, err := lf.file.WriteString(content + "\n") + if err != nil { + return fmt.Errorf("写入日志失败: %w", err) + } + + lf.size += int64(n) + + // 立即刷新到磁盘 + lf.file.Sync() + + return nil +} + +// rotate 日志轮转:压缩当前文件,创建新文件 +func (lf *logFile) rotate() error { + // 关闭当前文件 + if lf.file != nil { + lf.file.Close() + } + + // 生成备份文件名(带时间戳) + timestamp := time.Now().Format("20060102_150405") + oldPath := filepath.Join(lf.logDir, fmt.Sprintf("%s.log", lf.address)) + backupPath := filepath.Join(lf.logDir, fmt.Sprintf("%s_%s.log", lf.address, timestamp)) + + // 重命名当前文件 + if err := os.Rename(oldPath, backupPath); err != nil { + return fmt.Errorf("重命名日志文件失败: %w", err) + } + + // 压缩备份文件 + go func() { + if err := compressFile(backupPath); err != nil { + fmt.Printf("⚠️ 压缩日志文件失败 %s: %v\n", backupPath, err) + } else { + // 删除原文件 + os.Remove(backupPath) + } + }() + + // 创建新文件 + file, err := os.OpenFile(oldPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return fmt.Errorf("创建新日志文件失败: %w", err) + } + + lf.file = file + lf.size = 0 + + return nil +} + +// compressFile 压缩文件 +func compressFile(filePath string) error { + // 打开原文件 + srcFile, err := os.Open(filePath) + if err != nil { + return err + } + defer srcFile.Close() + + // 创建压缩文件 + gzPath := filePath + ".gz" + gzFile, err := os.Create(gzPath) + if err != nil { + return err + } + defer gzFile.Close() + + // 创建 gzip writer + gzWriter := gzip.NewWriter(gzFile) + defer gzWriter.Close() + + // 复制数据 + _, err = io.Copy(gzWriter, srcFile) + return err +} + +// LogTopup 记录充值消息 +func LogTopup(address string, status string, amount float64, txHash string, blockHeight uint64) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(address) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + timestamp := time.Now().Format("2006-01-02 15:04:05") + content := fmt.Sprintf("%s [topup]-[%s] | 金额: %.6f | 交易哈希: %s | 区块高度: %d | 地址: %s", + timestamp, status, amount, txHash, blockHeight, address) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogWithdraw 记录提现消息 +func LogWithdraw(queueId string, status string, amount float64, from string, to string, txHash string, blockHeight uint64) { + if txLogger == nil { + return + } + + // 使用 queueId 作为文件名 + lf, err := txLogger.getOrCreateLogFile(queueId) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + timestamp := time.Now().Format("2006-01-02 15:04:05") + content := fmt.Sprintf("%s [withdraw]-[%s] | 金额: %.6f | From: %s | To: %s | 交易哈希: %s | 区块高度: %d", + timestamp, status, amount, from, to, txHash, blockHeight) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogPay 记录支付消息 +func LogPay(orderId string, queueId string, status string, amount float64, from string, to string, txHash string, blockHeight uint64) { + if txLogger == nil { + return + } + + // 使用 orderId 作为文件名 + lf, err := txLogger.getOrCreateLogFile(orderId) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + timestamp := time.Now().Format("2006-01-02 15:04:05") + content := fmt.Sprintf("%s [pay]-[%s] | 订单ID: %s | 队列ID: %s | 金额: %.6f | From: %s | To: %s | 交易哈希: %s | 区块高度: %d", + timestamp, status, orderId, queueId, amount, from, to, txHash, blockHeight) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// Close 关闭所有日志文件 +func CloseTransactionLogger() { + if txLogger == nil { + return + } + + txLogger.mu.Lock() + defer txLogger.mu.Unlock() + + for _, lf := range txLogger.files { + if lf.file != nil { + lf.file.Close() + } + } +} diff --git a/internal/msg/msg.go b/internal/msg/msg.go index 5245541..8d8add0 100644 --- a/internal/msg/msg.go +++ b/internal/msg/msg.go @@ -62,12 +62,13 @@ type TopupMsg_req struct { // 返回充值结果消息 type TopupMsg_resp struct { - Address string `json:"address"` - Status int `json:"status"` - Chain string `json:"chain"` // 链名称 - Symbol string `json:"symbol"` // 币种 - Amount float64 `json:"amount"` - TxHash string `json:"tx_hash"` + Address string `json:"address"` + Status int `json:"status"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + Amount float64 `json:"amount"` + TxHash string `json:"tx_hash"` + BlockHeight uint64 `json:"block_height"` // 区块高度 } // 接收的提现消息 @@ -84,12 +85,15 @@ type WithdrawMsg_req struct { // 返回提现结果消息 type WithdrawMsg_resp struct { - QueueId string `json:"queue_id"` - Status int `json:"status"` - Amount float64 `json:"amount"` - Chain string `json:"chain"` // 链名称 - Symbol string `json:"symbol"` // 币种 - TxHash string `json:"tx_hash"` + QueueId string `json:"queue_id"` + Status int `json:"status"` + Amount float64 `json:"amount"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + TxHash string `json:"tx_hash"` + FromAddress string `json:"from_address"` // 来源地址 + ToAddress string `json:"to_address"` // 目标地址 + BlockHeight uint64 `json:"block_height"` // 区块高度 } // 接收到的支付消息 @@ -107,13 +111,16 @@ type PayMsg_req struct { // 返回支付结果消息 type PayMsg_resp struct { - QueueId string `json:"queue_id"` - Status int `json:"status"` - Amount float64 `json:"amount"` - Chain string `json:"chain"` // 链名称 - Symbol string `json:"symbol"` // 币种 - OrderId string `json:"order_id"` // 订单号 - TxHash string `json:"tx_hash"` + QueueId string `json:"queue_id"` + Status int `json:"status"` + Amount float64 `json:"amount"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + OrderId string `json:"order_id"` // 订单号 + TxHash string `json:"tx_hash"` + FromAddress string `json:"from_address"` // 来源地址 + ToAddress string `json:"to_address"` // 目标地址 + BlockHeight uint64 `json:"block_height"` // 区块高度 } // 节点通用消息结构 diff --git a/internal/server.go b/internal/server.go index e534a44..90be55c 100644 --- a/internal/server.go +++ b/internal/server.go @@ -8,6 +8,7 @@ import ( "m2pool-payment/internal/blockchain" "m2pool-payment/internal/blockchain/eth" "m2pool-payment/internal/crypto" + "m2pool-payment/internal/logger" message "m2pool-payment/internal/msg" rmq "m2pool-payment/internal/queue" "os" @@ -221,9 +222,13 @@ func handleChainEvent(chainEventCh chan any) { if msg.Status == STATUS_PENDING { log.Printf("📨 [链上] 充值待确认: Address=%s, Amount=%.2f, TxHash=%s", msg.Address, msg.Amount, msg.TxHash) + // 记录交易日志:待确认 + logger.LogTopup(msg.Address, "待确认", msg.Amount, msg.TxHash, msg.BlockHeight) } else { log.Printf("✅ [链上] 充值确认: Address=%s, Amount=%.2f, TxHash=%s, Status=%d", msg.Address, msg.Amount, msg.TxHash, msg.Status) + // 记录交易日志:已确认 + logger.LogTopup(msg.Address, "确认", msg.Amount, msg.TxHash, msg.BlockHeight) } err := s_ctx.rmqServer.PublishTopupResp(msg) if err != nil { @@ -234,6 +239,9 @@ func handleChainEvent(chainEventCh chan any) { // 提现确认 log.Printf("✅ [链上] 提现确认: QueueId=%s, Amount=%.2f, TxHash=%s, Status=%d", msg.QueueId, msg.Amount, msg.TxHash, msg.Status) + // 记录交易日志 + logger.LogWithdraw(msg.QueueId, "确认", msg.Amount, msg.FromAddress, + msg.ToAddress, msg.TxHash, msg.BlockHeight) err := s_ctx.rmqServer.PublishWithdrawResp(msg) if err != nil { log.Printf("❌ 发送提现响应失败: %v", err) @@ -243,6 +251,9 @@ func handleChainEvent(chainEventCh chan any) { // 支付确认 log.Printf("✅ [链上] 支付确认: QueueId=%s, OrderId=%s, Amount=%.2f, TxHash=%s, Status=%d", msg.QueueId, msg.OrderId, msg.Amount, msg.TxHash, msg.Status) + // 记录交易日志 + logger.LogPay(msg.OrderId, msg.QueueId, "确认", msg.Amount, msg.FromAddress, + msg.ToAddress, msg.TxHash, msg.BlockHeight) err := s_ctx.rmqServer.PublishPayResp(msg) if err != nil { log.Printf("❌ 发送支付响应失败: %v", err) @@ -263,6 +274,12 @@ func Start(msgKey string) { // 加载配置 loadConfig(msgKey) + // 初始化交易日志系统 + if err := logger.InitTransactionLogger("logs"); err != nil { + log.Fatalf("❌ 初始化交易日志系统失败: %v", err) + } + log.Println("✅ 交易日志系统初始化完成") + // ================== 初始化区块链节点 ================== initBlockChainServer() @@ -294,6 +311,7 @@ func Start(msgKey string) { s_ctx.blockChainServer.Stop("ETH") s_ctx.rmqServer.Close() + logger.CloseTransactionLogger() log.Println("👋 服务已全部关闭") }