From 74d9a114c0250c5988e2257ddb3469314a91a9cf Mon Sep 17 00:00:00 2001 From: lzx <393768033@qq.com> Date: Tue, 18 Nov 2025 11:10:16 +0800 Subject: [PATCH] add log-system, bug fixed --- README.md | 46 ++++ internal/blockchain/eth/eth.go | 8 +- internal/blockchain/eth/eth_prv.go | 310 +++++++++++++++++++++----- internal/db/mysql.go | 148 ++++++++++++ internal/listen/listen.go | 24 +- internal/listen/listen_prv.go | 227 ++++++++++++------- internal/logger/transaction_logger.go | 190 ++++++++++++++++ internal/msg/msg.go | 2 +- internal/server.go | 16 ++ internal/utils/utils.go | 21 +- public/eth_mysql.sql | 24 +- public/msg_mysql.sql | 23 +- 流程.txt | 8 +- 13 files changed, 861 insertions(+), 186 deletions(-) diff --git a/README.md b/README.md index 411e5c4..5b96d50 100644 --- a/README.md +++ b/README.md @@ -224,6 +224,52 @@ M2Pool Payment System v2 是一个基于以太坊区块链的**分布式支付 #### 9. Logger (`internal/logger/`) - **transaction_logger.go**:交易日志记录 - 记录所有交易操作的详细日志 + - 记录 RMQ 与 Listen 之间的所有通信消息 + - 按地址分文件存储,自动日志轮转和压缩 + +##### 日志分类 + +**① 交易日志(按地址分文件)** +- 充值日志:按 `address` 命名文件 +- 提现日志:按 `fromAddress` 命名文件 +- 支付日志:按 `fromAddress` 命名文件 +- ETH 节点日志:`ethnode.log` + +**② RMQ ↔ Listen 通信日志(按地址分文件)** +- 充值请求/响应:按 `address` 命名文件 +- 提现请求/响应:按 `fromAddress` 命名文件 +- 支付请求/响应:按 `fromAddress` 命名文件 +- 移除监听请求/响应:按 `address` 命名文件 + +##### 日志格式 + +**RMQ ↔ Listen 通信日志格式**:`[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount` + +- **TopupReq**: `[TopupReq]: 2006-01-02 15:04:05--0xabc...-ETH-USDT-0` +- **WithdrawReq**: `[WithdrawReq]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-100.500000` +- **PayReq**: `[PayReq]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-50.250000` +- **TopupResp**: `[TopupResp]: 2006-01-02 15:04:05-0xdef...-0xabc...-ETH-USDT-200.000000` +- **WithdrawResp**: `[WithdrawResp]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-100.500000` +- **PayResp**: `[PayResp]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-50.250000` + +##### 日志轮转机制 + +- 单文件超过 **1MB** 自动压缩为 `.gz` 格式 +- 后台异步压缩,不影响性能 +- 压缩后的文件名格式:`{address}_{timestamp}.log.gz` +- 自动创建新的日志文件继续写入 + +##### 日志存储位置 + +所有日志文件存储在 `logs/` 目录下: +``` +logs/ +├── 0x123...abc.log # 地址 0x123...abc 的日志 +├── 0x456...def.log # 地址 0x456...def 的日志 +├── 0x123...abc_20240102_120000.log.gz # 压缩的历史日志 +├── ethnode.log # ETH 节点日志 +└── ... +``` ### 项目结构 diff --git a/internal/blockchain/eth/eth.go b/internal/blockchain/eth/eth.go index 985a2a6..6562e79 100644 --- a/internal/blockchain/eth/eth.go +++ b/internal/blockchain/eth/eth.go @@ -145,10 +145,10 @@ func NewETHNode(cfg message.Config, decodeKey string, l *listen.ListenServer) (* Cancel: cancel, } // 初始化表 - err = ethnode.initTables() - if err != nil { - return nil, err - } + // err = ethnode.MysqlDB.ExecuteSQLFile("../public/eth_mysql.sql") + // if err != nil { + // log.Fatalf("ETH-Chain初始化数据库表失败:%v", err) + // } // 更新网络公共数据和加载钱包 height, err := ethnode.getHeight() if err != nil { diff --git a/internal/blockchain/eth/eth_prv.go b/internal/blockchain/eth/eth_prv.go index a7a897a..d61f5c1 100644 --- a/internal/blockchain/eth/eth_prv.go +++ b/internal/blockchain/eth/eth_prv.go @@ -10,6 +10,7 @@ import ( "m2pool-payment/internal/utils" "math/big" "os" + "strconv" "strings" "sync" "time" @@ -271,6 +272,7 @@ func (e *ETHNode) checkBalance(symbol, address string, target_amount_eth, target e.Wallets[address].mu.Lock() defer e.Wallets[address].mu.Unlock() if wallet, ok := e.Wallets[address]; ok { + log.Printf("需要的USDT:%d,钱包的USDT:%d", target_amount_usdt, e.Wallets[address].usdt_balance.balance) switch symbol { case "ETH": // ETH余额不足支付转账金额+GAS费用 @@ -300,29 +302,33 @@ func (e *ETHNode) loadWallets() error { SELECT ew.address, ew.queue_id, ew.timestamp, ew.sign, ew.status, eb.used_gas AS eth_used_gas, eb.balance AS eth_balance, - eb.success_tx_hash AS eth_successed_tx_hash, eb.failed_tx_hash AS eth_filed_tx_hash, + eb.success_tx_hash AS eth_successed_tx_hash, eb.failed_tx_hash AS eth_failed_tx_hash, ub.freeze_num AS usdt_freeze_num, ub.balance AS usdt_balance, - ub.success_tx_hash AS usdt_successed_tx_hash, ub.failed_tx_hash AS usdt_filed_tx_hash + ub.success_tx_hash AS usdt_successed_tx_hash, ub.failed_tx_hash AS usdt_failed_tx_hash FROM ETH_wallets ew LEFT JOIN ETH_balances eb ON ew.address = eb.address LEFT JOIN USDT_balances ub ON ew.address = ub.address WHERE ew.status = ?` - rows, err := e.SqliteDB.DB.Query(query, 1) + // 执行查询 + rows, err := e.MysqlDB.Query(query, 1) if err != nil { return fmt.Errorf("failed to get wallets: %v", err) } defer rows.Close() + // 解析字段为 big.Int 类型 parseBigInt := func(val sql.NullString) *big.Int { if val.Valid && strings.TrimSpace(val.String) != "" { if bi, ok := new(big.Int).SetString(strings.TrimSpace(val.String), 10); ok { return bi } } - return big.NewInt(0) + log.Printf("Invalid or empty string for big.Int: %s", val.String) // 添加调试日志 + return big.NewInt(0) // 如果无法转换,返回 0 } + // 解析字符串为交易哈希列表 parseHashList := func(val sql.NullString) []string { if !val.Valid || val.String == "" { return []string{} @@ -338,8 +344,11 @@ func (e *ETHNode) loadWallets() error { return res } + // 用来存储钱包信息 wallets := make(map[string]*Wallets) addresses := []string{} + + // 逐行处理查询结果 for rows.Next() { var ( addrStr, queueID, sign sql.NullString @@ -351,6 +360,7 @@ func (e *ETHNode) loadWallets() error { usdtSuccess, usdtFailed sql.NullString ) + // 扫描查询结果到变量 if err := rows.Scan( &addrStr, &queueID, ×tamp, &sign, &status, ðUsedGas, ðBalance, @@ -361,12 +371,14 @@ func (e *ETHNode) loadWallets() error { return fmt.Errorf("failed to scan row: %v", err) } + // 如果地址字段为空,则跳过该行 if !addrStr.Valid { continue } addr := strings.ToLower(addrStr.String) + // 创建钱包对象并填充数据 wallet := &Wallets{ address: addr, queueId: queueID.String, @@ -385,80 +397,105 @@ func (e *ETHNode) loadWallets() error { }(), } + // 解析 ETH 余额和信息 + ethBalanceStr := strings.TrimSpace(ethBalance.String) + log.Printf("Parsed ETH balance: %s for address: %s", ethBalanceStr, addrStr.String) // 调试日志 + ethBalanceValue, err := strconv.ParseFloat(ethBalanceStr, 64) + if err != nil { + log.Printf("Failed to parse ETH balance: %v, address: %s", err, addrStr.String) + ethBalanceValue = 0 // 给一个默认值 + } + wallet.eth_balance = ð_balance{ - symbol: "ETH", - used_gas: parseBigInt(ethUsedGas), - balance: parseBigInt(ethBalance), + symbol: "ETH", + used_gas: parseBigInt(ethUsedGas), + // balance: big.NewInt(int64(ethBalanceValue)), // 使用 big.Int 存储数值 + balance: utils.Float64ToBigInt("ETH", ethBalanceValue), successed_tx_hash: parseHashList(ethSuccess), failed_tx_hash: parseHashList(ethFailed), } + // 解析 USDT 余额和信息 + usdtBalanceStr := strings.TrimSpace(usdtBalance.String) + log.Printf("Parsed USDT balance: %s for address: %s", usdtBalanceStr, addrStr.String) // 调试日志 + usdtBalanceValue, err := strconv.ParseFloat(usdtBalanceStr, 64) + if err != nil { + log.Printf("Failed to parse USDT balance: %v, address: %s", err, addrStr.String) + usdtBalanceValue = 0 // 给一个默认值 + } + wallet.usdt_balance = &usdt_balance{ symbol: "USDT", freeze_num: parseBigInt(usdtFreeze), - balance: parseBigInt(usdtBalance), + balance: utils.Float64ToBigInt("USDT", usdtBalanceValue), // 使用 big.Int 存储数值 successed_tx_hash: parseHashList(usdtSuccess), failed_tx_hash: parseHashList(usdtFailed), } + + // 添加钱包地址到列表并将钱包信息存储到 map 中 addresses = append(addresses, addr) wallets[addr] = wallet } - log.Printf("ETH钱包加载成功:%v", addresses) + + // 检查是否有查询错误 if err := rows.Err(); err != nil { return fmt.Errorf("error occurred while iterating rows: %v", err) } + // 打印已加载的地址 + log.Printf("ETH钱包加载成功:%v", addresses) + + // 如果有地址,获取钱包的私钥 if len(addresses) > 0 { pks, err := e.getAddressesPks(addresses) if err != nil { - return fmt.Errorf("inital balance private key error: %v", err) + return fmt.Errorf("initial balance private key error: %v", err) } e.mu.Lock() e.Wallets = wallets + // 将私钥绑定到钱包对象中 for address, pk := range pks { e.Wallets[address].pk = pk } e.mu.Unlock() } - // for addr, balance := range e.Wallets { - // log.Printf("钱包:%s, wallets: %v", addr, balance) - // } + // 打印每个钱包的余额 + for addr, balance := range e.Wallets { + log.Printf("钱包:%s, ETH余额: %s, USDT余额: %s", addr, balance.eth_balance.balance.String(), balance.usdt_balance.balance.String()) + } return nil } func (e *ETHNode) loadUnConfirmedTxs() error { - query_str := "SELECT queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount, status FROM eth_unconfirmed_tx WHERE status = ?" - data, err := e.SqliteDB.Query_(query_str, 2) + // 查询语句 + query_str := "SELECT queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount, status FROM ETH_unconfirmed_tx WHERE status = ?" + // 执行查询 + rows, err := e.MysqlDB.Query(query_str, 2) if err != nil { - return fmt.Errorf("failed to get columns: %v", err) + return fmt.Errorf("failed to query unconfirmed transactions: %v", err) } + defer rows.Close() // 确保查询结束后关闭 rows - for _, row := range data { - // 提取各个字段并确保正确转换为目标类型 - queueId, ok := row["queue_id"].(string) - if !ok { - return fmt.Errorf("invalid type for queue_id, expected string but got %T", row["queue_id"]) + // 遍历查询结果 + for rows.Next() { + var queueId, chain, symbol, fromAddr, toAddr, txHash, amount string + var txType, height, status int + // 读取当前行数据 + if err := rows.Scan(&queueId, &txType, &chain, &symbol, &fromAddr, &toAddr, &txHash, &height, &amount, &status); err != nil { + return fmt.Errorf("failed to scan row: %v", err) } - // 你可以继续提取其他字段并做类似类型转换 - txType, _ := row["tx_type"].(int) // 假设 tx_type 是 int 类型 - chain, _ := row["chain"].(string) - symbol, _ := row["symbol"].(string) - fromAddr, _ := row["from_addr"].(string) - toAddr, _ := row["to_addr"].(string) - tx_hash, _ := row["tx_hash"].(string) - height, _ := row["height"].(int) // 假设 height 是 int 类型 - amount, _ := row["amount"].(string) // amount 是字符串类型 - status, _ := row["status"].(int) // status 是 int 类型 - big_amount := new(big.Int) - big_Amount, ok := big_amount.SetString(amount, 10) - if !ok { - return fmt.Errorf("amount to bigInt error: %v", err) + // 将 amount 转换为 big.Int + bigAmount := new(big.Int) + bigAmountParsed, success := bigAmount.SetString(amount, 10) + if !success { + return fmt.Errorf("failed to convert amount '%s' to big.Int", amount) } + + // 锁定并填充数据 e.UnConfirmedTxs.mu.Lock() - // 填充 Transaction 结构体 e.UnConfirmedTxs.Transactions[queueId] = message.Transaction{ QueueId: queueId, TxType: txType, @@ -466,13 +503,19 @@ func (e *ETHNode) loadUnConfirmedTxs() error { Symbol: symbol, From: fromAddr, To: toAddr, - TxHash: tx_hash, + TxHash: txHash, Height: uint64(height), - Amount: big_Amount, + Amount: bigAmountParsed, // 使用转换后的 big.Int Status: status, } e.UnConfirmedTxs.mu.Unlock() } + + // 检查是否存在查询错误 + if err := rows.Err(); err != nil { + return fmt.Errorf("error during row iteration: %v", err) + } + return nil } @@ -762,7 +805,16 @@ func (e *ETHNode) handleETHEvent(header *types.Header) { Status: constant.STATUS_PENDING, } e.UnConfirmedTxs.mu.Unlock() - + go func() { + str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, 0, "ETH", "ETH", fromAddr, toAddr, txHash, height, amount} + _, err := e.MysqlDB.Insert(str, [][]any{params}) + if err != nil { + log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err) + return + } + }() + continue } // fromAddr和监听钱包一致,表示(提现/支付) @@ -807,6 +859,16 @@ func (e *ETHNode) handleETHEvent(header *types.Header) { Status: constant.STATUS_PENDING, } e.UnConfirmedTxs.mu.Unlock() + go func() { + str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{v.QueueId, 1, "ETH", "ETH", fromAddr, toAddr, txHash, height, amount} + _, err := e.MysqlDB.Insert(str, [][]any{params}) + if err != nil { + log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err) + return + } + }() + continue } case message.PayMsg_req: @@ -838,6 +900,16 @@ func (e *ETHNode) handleETHEvent(header *types.Header) { Status: constant.STATUS_PENDING, } e.UnConfirmedTxs.mu.Unlock() + go func() { + str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{v.QueueId, 2, "ETH", "ETH", fromAddr, toAddr, txHash, height, amount} + _, err := e.MysqlDB.Insert(str, [][]any{params}) + if err != nil { + log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err) + return + } + }() + continue } default: @@ -931,7 +1003,16 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) { Status: constant.STATUS_PENDING, } e.UnConfirmedTxs.mu.Unlock() - + go func() { + str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, 0, "ETH", "USDT", fromAddr, toAddr, txHash, height, value_float} + _, err := e.MysqlDB.Insert(str, [][]any{params}) + if err != nil { + log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err) + return + } + }() + return } // fromAddr和监听钱包一致,表示(提现/支付) @@ -976,6 +1057,16 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) { Status: constant.STATUS_PENDING, } e.UnConfirmedTxs.mu.Unlock() + go func() { + str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{v.QueueId, 1, "ETH", "USDT", fromAddr, toAddr, txHash, height, value_float} + _, err := e.MysqlDB.Insert(str, [][]any{params}) + if err != nil { + log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err) + return + } + }() + return } case message.PayMsg_req: @@ -1007,6 +1098,16 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) { Status: constant.STATUS_PENDING, } e.UnConfirmedTxs.mu.Unlock() + go func() { + str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{v.QueueId, 2, "ETH", "USDT", fromAddr, toAddr, txHash, height, value_float} + _, err := e.MysqlDB.Insert(str, [][]any{params}) + if err != nil { + log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err) + return + } + }() + return } default: @@ -1014,7 +1115,13 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) { } } +var tableMap = map[string]string{ + "ETH": "ETH_balances", + "USDT": "USDT_balances", +} + func (e *ETHNode) confirm() { + e.mu.Lock() unconfirmedTxs := e.UnConfirmedTxs now_height := e.NetInfo.Height @@ -1022,6 +1129,7 @@ func (e *ETHNode) confirm() { var responses []any e.UnConfirmedTxs.mu.Lock() for txHash, tx := range unconfirmedTxs.Transactions { + // 高度成熟:当前高度 >= 交易高度 + 确认高度 if now_height >= tx.Height+e.ConfirmHeight { check_result, actual_gas, err := e.checkTransaction(txHash) @@ -1039,6 +1147,7 @@ func (e *ETHNode) confirm() { if ee != nil { log.Printf("Query Message error: %v, queue_id: %s, tx_type: %d", ee, tx.QueueId, tx.TxType) status = constant.STATUS_FAILED + // 如果找不到消息,仍然需要处理交易失败的情况 msg = nil } else { @@ -1061,10 +1170,7 @@ func (e *ETHNode) confirm() { delete(e.UnConfirmedTxs.Transactions, txHash) continue } - var tableMap = map[string]string{ - "ETH": "ETH_balances", - "USDT": "USDT_balances", - } + switch v := msg.(type) { case message.TopupMsg_req: if status == constant.STATUS_SUCCESS { @@ -1091,16 +1197,31 @@ func (e *ETHNode) confirm() { TxHash: txHash, BlockHeight: tx.Height, } + // 修改钱包表 go func() { - str := "UPDATE " + tableMap[tx.Symbol] + " SET success_tx_hash = success_tx_hash || ? WHERE address = ?" - params := []any{txHash + ",", v.Address} - count, err := e.SqliteDB.Update(str, params) + var str string + var params []any + if status == constant.STATUS_SUCCESS { + str = "UPDATE " + tableMap[tx.Symbol] + " SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance + ? WHERE address = ?" + params = []any{txHash + ",", float_amount, v.Address} + } else { + str = "UPDATE " + tableMap[tx.Symbol] + " SET failed_tx_hash = CONCAT(failed_tx_hash, ?) WHERE address = ?" + params = []any{txHash + ",", v.Address} + } + + _, err := e.MysqlDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status - log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", v.QueueId, err) - } else if count != 1 { - // 如果更新的行数不是 1,日志中记录详细信息 - log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", v.QueueId, count) + log.Printf("Failed to update success_tx_hash/failed_tx_hash for queue_id %s: %v", v.QueueId, err) + } + }() + // 修改待确认交易表 + go func() { + str := "UPDATE ETH_unconfirmed_tx SET status = ? WHERE tx_hash = ?" + params := []any{status, txHash} + _, err := e.MysqlDB.Update(str, params) + if err != nil { + log.Printf("Failed to update ETH_unconfirmed_tx: %v", err) } }() responses = append(responses, response) // 将消息提交至responses @@ -1134,6 +1255,45 @@ func (e *ETHNode) confirm() { ToAddress: tx.To, BlockHeight: tx.Height, } + // 修改钱包表 + go func() { + var str, str1 string + var params, params1 []any + txHashWithComma := txHash + "," + if status == constant.STATUS_SUCCESS { + if tableMap[tx.Symbol] == "ETH_balances" { + str = "UPDATE ETH_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, used_gas = used_gas + ? WHERE address = ?" + params = []any{txHashWithComma, float_amount, utils.BigIntETHToFloat64(actual_gas), v.FromAddress} + str1 = "" + params1 = []any{} + } else { + str = "UPDATE USDT_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, freeze_num = freeze_num + ? WHERE address = ?" + params = []any{txHashWithComma, float_amount, v.Fee, v.FromAddress} + str1 = "UPDATE ETH_balances SET used_gas = used_gas + ? WHERE address = ?" + params1 = []any{utils.BigIntETHToFloat64(actual_gas), v.FromAddress} + } + } else { + str = "UPDATE " + tableMap[tx.Symbol] + " SET failed_tx_hash = CONCAT(failed_tx_hash, ?) WHERE address = ?" + params = []any{txHashWithComma, v.FromAddress} + str1 = "" + params1 = []any{} + } + + err := e.MysqlDB.ExecuteTransactions([]string{str, str1}, [][]any{params, params1}) + if err != nil { + // 更详细的错误日志,包括 QueueId 和 Status + log.Printf("Failed to update success_tx_hash/failed_tx_hash for queue_id %s: %v", v.QueueId, err) + } + }() + // 修改待确认交易表 + go func() { + str := "UPDATE ETH_unconfirmed_tx SET status = ? WHERE tx_hash = ?" + params := []any{status, txHash} + _, err := e.MysqlDB.Update(str, params) + if err != nil { + log.Printf("Failed to update ETH_unconfirmed_tx: %v", err) + } + }() responses = append(responses, response) // 将消息提交至responses case message.PayMsg_req: e.Wallets[tx.From].eth_balance.used_gas = new(big.Int).Add(e.Wallets[tx.From].eth_balance.used_gas, actual_gas) @@ -1162,6 +1322,45 @@ func (e *ETHNode) confirm() { ToAddress: tx.To, BlockHeight: tx.Height, } + // 修改钱包表 + go func() { + var str, str1 string + var params, params1 []any + txHashWithComma := txHash + "," + if status == constant.STATUS_SUCCESS { + if tableMap[tx.Symbol] == "ETH_balances" { + str = "UPDATE ETH_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, used_gas = used_gas + ? WHERE address = ?" + params = []any{txHashWithComma, float_amount, utils.BigIntETHToFloat64(actual_gas), v.FromAddress} + str1 = "" + params1 = []any{} + } else { + str = "UPDATE USDT_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, freeze_num = freeze_num + ? WHERE address = ?" + params = []any{txHashWithComma, float_amount, v.Fee, v.FromAddress} + str1 = "UPDATE ETH_balances SET used_gas = used_gas + ? WHERE address = ?" + params1 = []any{utils.BigIntETHToFloat64(actual_gas), v.FromAddress} + } + } else { + str = "UPDATE " + tableMap[tx.Symbol] + " SET failed_tx_hash = CONCAT(failed_tx_hash, ?) WHERE address = ?" + params = []any{txHashWithComma, v.FromAddress} + str1 = "" + params1 = []any{} + } + + err := e.MysqlDB.ExecuteTransactions([]string{str, str1}, [][]any{params, params1}) + if err != nil { + // 更详细的错误日志,包括 QueueId 和 Status + log.Printf("Failed to update success_tx_hash/failed_tx_hash for queue_id %s: %v", v.QueueId, err) + } + }() + // 修改待确认交易表 + go func() { + str := "UPDATE ETH_unconfirmed_tx SET status = ? WHERE tx_hash = ?" + params := []any{status, txHash} + _, err := e.MysqlDB.Update(str, params) + if err != nil { + log.Printf("Failed to update ETH_unconfirmed_tx: %v", err) + } + }() responses = append(responses, response) // 将消息提交至responses default: log.Printf("未知的消息类型: %v, 跳过此交易", v) @@ -1225,7 +1424,7 @@ func (e *ETHNode) handleListen_Topup_req(msg message.TopupMsg_req) { params2 := []any{msg.Address} str3 := "INSERT INTO USDT_balances (address) VALUES (?)" params3 := []any{msg.Address} - err = e.SqliteDB.ExecuteTransactions([]string{str1, str2, str3}, [][]any{params1, params2, params3}) + err = e.MysqlDB.ExecuteTransactions([]string{str1, str2, str3}, [][]any{params1, params2, params3}) if err != nil { log.Printf("Received ListenServer Topup_req msg: insert sqlite3 db error: %v", err) } @@ -1269,6 +1468,7 @@ func (e *ETHNode) handleListen_Withdraw_req(msg message.WithdrawMsg_req) { FromAddress: msg.FromAddress, ToAddress: msg.ToAddress, } + check_result, err := e.checkBalance(msg.Symbol, msg.FromAddress, target_amount_eth, target_amount_usdt) // 余额校验错误,绕过转账,返回错误响应 if err != nil { @@ -1383,9 +1583,9 @@ func (e *ETHNode) handleListen_Remove_req(msg message.RemoveListenMsg_req) { } str := "UPDATE ETH_wallets SET status = ? WHERE address = ?" params := []any{0, msg.Address} - count, err := e.SqliteDB.Update(str, params) - if err != nil || count != 1 { - log.Printf("Remove address(%s) error: count(%d)", msg.Address, count) + _, err := e.MysqlDB.Update(str, params) + if err != nil { + log.Printf("Remove address(%s) error: %v", msg.Address, err) // result_msg.Status = constant.STATUS_FAILED } go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second) diff --git a/internal/db/mysql.go b/internal/db/mysql.go index 8d6e73d..74d1329 100644 --- a/internal/db/mysql.go +++ b/internal/db/mysql.go @@ -1,9 +1,13 @@ package db import ( + "bufio" "database/sql" "fmt" + "io/ioutil" + "log" message "m2pool-payment/internal/msg" + "strings" _ "github.com/go-sql-driver/mysql" ) @@ -34,6 +38,67 @@ func NewMySQLPool(cfg message.MysqlConfig) (*MySQLPool, error) { return &MySQLPool{db: db}, nil } +// splitSQLStatements 将 SQL 内容按分号分割 +func splitSQLStatements(sqlContent string) []string { + // 使用 bufio 扫描文件内容 + var statements []string + scanner := bufio.NewScanner(strings.NewReader(sqlContent)) + var queryBuilder strings.Builder + + for scanner.Scan() { + line := scanner.Text() + + // 处理每一行的 SQL 语句 + if strings.TrimSpace(line) == "" { + continue + } + + // 如果行中包含分号,说明是完整的 SQL 语句 + queryBuilder.WriteString(line) + if strings.HasSuffix(line, ";") { + statements = append(statements, queryBuilder.String()) + queryBuilder.Reset() // 清空构建器以便准备下一条 SQL 语句 + } else { + queryBuilder.WriteString("\n") + } + } + + // 处理可能的扫描错误 + if err := scanner.Err(); err != nil { + log.Fatalf("error reading .sql file: %v\n", err) + } + + return statements +} + +func (p *MySQLPool) ExecuteSQLFile(filePath string) error { + // 读取 SQL 文件内容 + sqlContent, err := ioutil.ReadFile(filePath) + if err != nil { + return fmt.Errorf("failed to read SQL file: %v", err) + } + // 将文件内容按分号 (;) 分割成多条 SQL 语句 + queries := splitSQLStatements(string(sqlContent)) + + // 执行每一条 SQL 语句 + for _, query := range queries { + // 跳过空行或注释 + if strings.TrimSpace(query) == "" || strings.HasPrefix(strings.TrimSpace(query), "--") { + continue + } + + // 执行 SQL 语句 + _, err := p.db.Exec(query) + if err != nil { + log.Printf("error executing query: %v\n", err) + } else { + // fmt.Println("Executed query:", query) + } + } + + return nil +} + // Exec 执行 INSERT/UPDATE/DELETE func (p *MySQLPool) Exec(query string, args ...any) (sql.Result, error) { return p.db.Exec(query, args...) @@ -64,6 +129,89 @@ func (p *MySQLPool) Transaction(fn func(tx *sql.Tx) error) error { return tx.Commit() } +// Insert 执行通用插入操作 +func (p *MySQLPool) Insert(query string, values [][]any) (sql.Result, error) { + // 预处理查询 + stmt, err := p.db.Prepare(query) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement: %v", err) + } + defer stmt.Close() + + // 执行批量插入 + var result sql.Result + for _, row := range values { + result, err = stmt.Exec(row...) + if err != nil { + return nil, fmt.Errorf("failed to execute insert: %v", err) + } + } + + return result, nil +} + +// Delete 执行通用删除操作 + +// Update 执行通用更新操作 +func (p *MySQLPool) Update(query string, values []any) (sql.Result, error) { + // 预处理查询 + stmt, err := p.db.Prepare(query) + if err != nil { + return nil, fmt.Errorf("failed to prepare statement: %v", err) + } + defer stmt.Close() + + // 执行更新操作 + result, err := stmt.Exec(values...) + if err != nil { + return nil, fmt.Errorf("failed to execute update: %v", err) + } + + return result, nil +} + +// ExecuteTransactions 执行多条增删改操作,确保事务的原子性 +func (p *MySQLPool) ExecuteTransactions(str_sqls []string, params [][]any) error { + // 检查 SQL 和参数的数量是否匹配 + if len(str_sqls) != len(params) { + return fmt.Errorf("sql length != params length") + } + + // 开始事务 + tx, err := p.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + + // 确保在函数结束时提交或回滚事务 + defer func() { + if err != nil { + // 发生错误时回滚事务 + if rollbackErr := tx.Rollback(); rollbackErr != nil { + err = fmt.Errorf("failed to rollback transaction: %v", rollbackErr) + } + } else { + // 如果没有错误,提交事务 + if commitErr := tx.Commit(); commitErr != nil { + err = fmt.Errorf("failed to commit transaction: %v", commitErr) + } + } + }() + + // 执行每个 SQL 语句 + for i, sql_str := range str_sqls { + // 使用事务对象 tx 来执行 SQL + _, err := tx.Exec(sql_str, params[i]...) + if err != nil { + // 如果执行失败,立即返回并且触发回滚 + return fmt.Errorf("failed to execute SQL at index %d: %v", i, err) + } + } + + // 如果所有 SQL 执行成功,则返回 nil + return nil +} + // Close 关闭连接池 func (p *MySQLPool) Close() error { return p.db.Close() diff --git a/internal/listen/listen.go b/internal/listen/listen.go index 4cfba2b..a891f41 100644 --- a/internal/listen/listen.go +++ b/internal/listen/listen.go @@ -10,8 +10,8 @@ import ( ) type ListenServer struct { - Config message.Config - // MysqlDB *db.MySQLPool + Config message.Config + MysqlDB *db.MySQLPool SqliteDB *db.SQLite TopupMsgs map[string]*TopupMsgs // {"ETH": TopupMsgs{"queue_id": message.Topupmsg_req{}, ...}} WithdrawMsgs map[string]*WithdrawMsgs @@ -45,12 +45,12 @@ type RemoveMsgs struct { } func NewListenServer(cfg message.Config) *ListenServer { - // // 初始化MySQL数据库 - // dbConn, err := db.NewMySQLPool(cfg.MysqlConfig["wallet"]) - // if err != nil { - // log.Printf("mysql connect error: %v", err) - // return nil - // } + // 初始化MySQL数据库 + dbConn, err := db.NewMySQLPool(cfg.MysqlConfig["wallet"]) + if err != nil { + log.Printf("mysql connect error: %v", err) + return nil + } // 初始化SQLite3 sqlite, err := db.NewSQLite(cfg.MsgConfig.SqlitePath) @@ -86,8 +86,8 @@ func NewListenServer(cfg message.Config) *ListenServer { chin[net] = make(chan any, 1000) } var l = &ListenServer{ - Config: cfg, - // MysqlDB: dbConn, + Config: cfg, + MysqlDB: dbConn, SqliteDB: sqlite, TopupMsgs: topup_msgs, WithdrawMsgs: withdraw_msgs, @@ -98,6 +98,10 @@ func NewListenServer(cfg message.Config) *ListenServer { ChFromRmqServer: rmq_ch_in, ChToRmqServer: rmq_ch_out, } + // err = l.MysqlDB.ExecuteSQLFile("../public/msg_mysql.sql") + // if err != nil { + // log.Fatalf("Listen-message初始化数据库表失败:%v", err) + // } l.loadMsg() log.Println("✅ 消息监听处理已启动") return l diff --git a/internal/listen/listen_prv.go b/internal/listen/listen_prv.go index d90b6c1..b775e4b 100644 --- a/internal/listen/listen_prv.go +++ b/internal/listen/listen_prv.go @@ -5,10 +5,12 @@ import ( "log" "m2pool-payment/internal/constant" message "m2pool-payment/internal/msg" + "strconv" "time" ) // 加载所有消息 + func (l *ListenServer) loadMsg() { topup_sql := "SELECT queue_id, chain, symbol, address, timestamp, sign, status FROM topup_req_msg WHERE status = ?" // 1正在监听 withdraw_sql := "SELECT queue_id, chain, symbol, from_addr, to_addr, amount, fee, timestamp, sign, status FROM withdraw_req_msg WHERE status = ? OR status = ?" // 2待确认,5待支付 @@ -18,23 +20,30 @@ func (l *ListenServer) loadMsg() { withdraw_params := []any{2, 5} pay_params := []any{2, 5} remove_params := []any{2} + + // 处理充值消息 go func() { - rows, err := l.SqliteDB.Query_(topup_sql, topup_params...) + rows, err := l.MysqlDB.Query(topup_sql, topup_params...) if err != nil { log.Fatalf("load topup-msg error: %v", err) return } - // 遍历查询结果 - for _, row := range rows { - queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 - chain := row["chain"].(string) // 假设 chain 是 string 类型 - symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 - address := row["address"].(string) // 假设 address 是 string 类型 - timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型 - sign := row["sign"].(string) // 假设 sign 是 string 类型 - status := row["status"].(int64) // 假设 status 是 INTEGER 类型 + defer rows.Close() // 确保在函数结束时关闭 rows - // 处理数据,例如打印输出 + // 遍历查询结果 + for rows.Next() { + var queueID, chain, symbol, address, sign string + var timestamp int64 + var status int + + // 扫描每行数据到相应的变量 + err := rows.Scan(&queueID, &chain, &symbol, &address, ×tamp, &sign, &status) + if err != nil { + log.Printf("failed to scan topup row: %v", err) + continue + } + + // 处理数据 l.TopupMsgs[chain].mu.RLock() l.TopupMsgs[chain].Msgs[queueID] = message.TopupMsg_req{ QueueId: queueID, @@ -43,33 +52,51 @@ func (l *ListenServer) loadMsg() { Address: address, Timestamp: uint64(timestamp), Sign: sign, - Status: int(status), + Status: status, } l.TopupMsgs[chain].mu.RUnlock() } + if err := rows.Err(); err != nil { + log.Printf("error occurred during rows iteration: %v", err) + } log.Printf("充值历史消息load完毕") }() + // 处理提现消息 go func() { - rows, err := l.SqliteDB.Query_(withdraw_sql, withdraw_params...) + rows, err := l.MysqlDB.Query(withdraw_sql, withdraw_params...) if err != nil { log.Fatalf("load withdraw-msg error: %v", err) return } - // 遍历查询结果 - for _, row := range rows { - queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 - chain := row["chain"].(string) // 假设 chain 是 string 类型 - symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 - fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型 - toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型 - amount := row["amount"].(float64) // 假设 amount 是 float64 类型 - fee := row["fee"].(float64) // 假设 fee 是 float64 类型 - timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型 - sign := row["sign"].(string) // 假设 sign 是 string 类型 - status := row["status"].(int64) // 假设 status 是 string 类型 + defer rows.Close() - // 处理数据,例如打印输出 + // 遍历查询结果 + for rows.Next() { + var queueID, chain, symbol, fromAddr, toAddr, amountStr, feeStr, sign string + var timestamp int64 + var status int + + // 扫描每行数据 + err := rows.Scan(&queueID, &chain, &symbol, &fromAddr, &toAddr, &amountStr, &feeStr, ×tamp, &sign, &status) + if err != nil { + log.Printf("failed to scan withdraw row: %v", err) + continue + } + + // 将 amount 和 fee 转换为浮动数字 + amount, err := strconv.ParseFloat(amountStr, 64) + if err != nil { + log.Printf("failed to parse amount: %v", err) + continue + } + fee, err := strconv.ParseFloat(feeStr, 64) + if err != nil { + log.Printf("failed to parse fee: %v", err) + continue + } + + // 处理数据 l.WithdrawMsgs[chain].mu.RLock() l.WithdrawMsgs[chain].Msgs[queueID] = message.WithdrawMsg_req{ QueueId: queueID, @@ -81,33 +108,51 @@ func (l *ListenServer) loadMsg() { Fee: fee, Timestamp: uint64(timestamp), Sign: sign, - Status: int(status), + Status: status, } l.WithdrawMsgs[chain].mu.RUnlock() } + if err := rows.Err(); err != nil { + log.Printf("error occurred during rows iteration: %v", err) + } log.Printf("提现历史消息load完毕") }() + // 处理支付消息 go func() { - rows, err := l.SqliteDB.Query_(pay_sql, pay_params...) + rows, err := l.MysqlDB.Query(pay_sql, pay_params...) if err != nil { log.Fatalf("load pay-msg error: %v", err) return } - // 遍历查询结果 - for _, row := range rows { - queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 - chain := row["chain"].(string) // 假设 chain 是 string 类型 - symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 - fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型 - toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型 - amount := row["amount"].(float64) // 假设 amount 是 float64 类型 - fee := row["fee"].(float64) // 假设 fee 是 float64 类型 - timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型 - sign := row["sign"].(string) // 假设 sign 是 string 类型 - status := row["status"].(int64) // 假设 status 是 string 类型 + defer rows.Close() - // 处理数据,例如打印输出 + // 遍历查询结果 + for rows.Next() { + var queueID, chain, symbol, fromAddr, toAddr, amountStr, feeStr, sign string + var timestamp int64 + var status int + + // 扫描每行数据 + err := rows.Scan(&queueID, &chain, &symbol, &fromAddr, &toAddr, &amountStr, &feeStr, ×tamp, &sign, &status) + if err != nil { + log.Printf("failed to scan pay row: %v", err) + continue + } + + // 将 amount 和 fee 转换为浮动数字 + amount, err := strconv.ParseFloat(amountStr, 64) + if err != nil { + log.Printf("failed to parse amount: %v", err) + continue + } + fee, err := strconv.ParseFloat(feeStr, 64) + if err != nil { + log.Printf("failed to parse fee: %v", err) + continue + } + + // 处理数据 l.PayMsgs[chain].mu.RLock() l.PayMsgs[chain].Msgs[queueID] = message.PayMsg_req{ QueueId: queueID, @@ -119,29 +164,37 @@ func (l *ListenServer) loadMsg() { Fee: fee, Timestamp: uint64(timestamp), Sign: sign, - Status: int(status), + Status: status, } l.PayMsgs[chain].mu.RUnlock() } + if err := rows.Err(); err != nil { + log.Printf("error occurred during rows iteration: %v", err) + } log.Printf("支付历史消息load完毕") }() + // 处理移除消息 go func() { - rows, err := l.SqliteDB.Query_(remove_sql, remove_params...) + rows, err := l.MysqlDB.Query(remove_sql, remove_params...) if err != nil { log.Fatalf("load remove-msg error: %v", err) return } + defer rows.Close() - for _, row := range rows { - queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 - msgType := row["msg_type"].(int) - chain := row["chain"].(string) // 假设 chain 是 string 类型 - symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 - address := row["address"].(string) // 假设 address 是 string 类型 - timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型 - sign := row["sign"].(string) // 假设 sign 是 string 类型 - status := row["status"].(int64) // 假设 status 是 INTEGER 类型 + // 遍历查询结果 + for rows.Next() { + var queueID, chain, symbol, address, sign string + var timestamp int64 + var status, msgType int + + // 扫描每行数据 + err := rows.Scan(&queueID, &msgType, &chain, &symbol, &address, ×tamp, &sign, &status) + if err != nil { + log.Printf("failed to scan remove row: %v", err) + continue + } // 处理数据 l.RemoveMsgs[chain].mu.RLock() @@ -153,10 +206,13 @@ func (l *ListenServer) loadMsg() { Address: address, Timestamp: uint64(timestamp), Sign: sign, - Stauts: int(status), + Status: status, } l.RemoveMsgs[chain].mu.RUnlock() } + if err := rows.Err(); err != nil { + log.Printf("error occurred during rows iteration: %v", err) + } log.Printf("移除监听历史消息load完毕") }() } @@ -170,7 +226,7 @@ func (l *ListenServer) handleRmqTopup_req(msg message.TopupMsg_req) { // 写数据库 sql := "INSERT INTO topup_req_msg (queue_id, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?)" params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign} - err := l.SqliteDB.Insert(sql, params) + _, err := l.MysqlDB.Insert(sql, [][]any{params}) if err != nil { log.Printf("Insert Topup_req msg error: %v", err) go l.asyncSendMsgToRmq(message.TopupMsg_resp{ @@ -196,7 +252,7 @@ func (l *ListenServer) handleRmqWithdraw_req(msg message.WithdrawMsg_req) { // 写数据库 sql := "INSERT INTO withdraw_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign} - err := l.SqliteDB.Insert(sql, params) + _, err := l.MysqlDB.Insert(sql, [][]any{params}) if err != nil { log.Printf("Insert Withdraw_req msg error: %v", err) go l.asyncSendMsgToRmq(message.WithdrawMsg_resp{ @@ -223,7 +279,7 @@ func (l *ListenServer) handleRmqPay_req(msg message.PayMsg_req) { // 写数据库 sql := "INSERT INTO pay_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign} - err := l.SqliteDB.Insert(sql, params) + _, err := l.MysqlDB.Insert(sql, [][]any{params}) if err != nil { log.Printf("Insert PayMsg_req msg error: %v", err) go l.asyncSendMsgToRmq(message.PayMsg_resp{ @@ -249,7 +305,7 @@ func (l *ListenServer) handleRmqRemove_req(msg message.RemoveListenMsg_req) { sql := "INSERT INTO remove_req_msg (queue_id, msg_type, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign} - err := l.SqliteDB.Insert(sql, params) + _, err := l.MysqlDB.Insert(sql, [][]any{params}) if err != nil { log.Printf("Insert Remove_req msg error: %v", err) go l.asyncSendMsgToRmq(message.RemoveListenMsg_resp{ @@ -276,14 +332,15 @@ func (l *ListenServer) handleUpdateReqState(msg message.UpdateReqState) { str := "UPDATE " + typeMap[msg.MsgType] + " SET status = ? WHERE queue_id = ?" params := []any{msg.Status, msg.QueueId} go func() { - count, err := l.SqliteDB.Update(str, params) + _, err := l.MysqlDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status log.Printf("Failed to update update_req_msg for queue_id %s: %v", msg.QueueId, err) - } else if count != 1 { - // 如果更新的行数不是 1,日志中记录详细信息 - log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) } + // else if count != 1 { + // // 如果更新的行数不是 1,日志中记录详细信息 + // log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + // } }() } @@ -294,18 +351,19 @@ func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) { // 修改数据库 str := "UPDATE topup_resp_msg SET status = ? WHERE tx_hash = ?" params := []any{msg.Status, msg.TxHash} - count, err := l.SqliteDB.Update(str, params) + _, err := l.MysqlDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status log.Printf("Failed to update topup_resp_msg for queue_id %s: %v", msg.QueueId, err) - } else if count != 1 { - // 如果更新的行数不是 1,日志中记录详细信息 - log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) } + // else if count != 1 { + // // 如果更新的行数不是 1,日志中记录详细信息 + // log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + // } case constant.STATUS_PENDING: str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, tx_hash, height, status) VALUES (?,?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.Address, msg.Amount, msg.TxHash, msg.BlockHeight, msg.Status} - err := l.SqliteDB.Insert(str, params) + _, err := l.MysqlDB.Insert(str, [][]any{params}) if err != nil { log.Printf("Insert Topup_resp msg error: %v", err) } @@ -313,7 +371,7 @@ func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) { // 插入数据库 str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, to_addr, status) VALUES (?,?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Status} - err := l.SqliteDB.Insert(str, params) + _, err := l.MysqlDB.Insert(str, [][]any{params}) if err != nil { log.Printf("Insert Topup_resp msg error: %v", err) } @@ -329,7 +387,7 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) { go func() { str := "INSERT INTO withdraw_resp_msg (queue_id, chain, symbol, from_addr, to_addr, tx_hash, amount, fee, height, status) VALUES (?,?,?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.TxHash, msg.Amount, msg.Fee, msg.BlockHeight, msg.Status} - err := l.SqliteDB.Insert(str, params) + _, err := l.MysqlDB.Insert(str, [][]any{params}) if err != nil { log.Println(err) } @@ -342,14 +400,15 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) { go func() { str := "UPDATE withdraw_resp_msg SET status = ? WHERE tx_hash = ?" params := []any{msg.Status, msg.TxHash} - count, err := l.SqliteDB.Update(str, params) + _, err := l.MysqlDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status log.Printf("Failed to update withdraw_resp_msg for queue_id %s: %v", msg.QueueId, err) - } else if count != 1 { - // 如果更新的行数不是 1,日志中记录详细信息 - log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) } + // else if count != 1 { + // // 如果更新的行数不是 1,日志中记录详细信息 + // log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + // } }() go l.asyncSendMsgToRmq(msg, 3, 5*time.Second) default: @@ -360,7 +419,7 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) { go func() { str := "INSERT INTO withdraw_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, fee, status) VALUES (?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Status} - err := l.SqliteDB.Insert(str, params) + _, err := l.MysqlDB.Insert(str, [][]any{params}) if err != nil { log.Println(err) } @@ -377,7 +436,7 @@ func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) { go func() { str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, tx_hash, amount, fee, height, status) VALUES (?,?,?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.TxHash, msg.Amount, msg.Fee, msg.BlockHeight, msg.Status} - err := l.SqliteDB.Insert(str, params) + _, err := l.MysqlDB.Insert(str, [][]any{params}) if err != nil { log.Println(err) } @@ -390,14 +449,15 @@ func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) { go func() { str := "UPDATE pay_resp_msg SET status = ? WHERE tx_hash = ?" params := []any{msg.Status, msg.TxHash} - count, err := l.SqliteDB.Update(str, params) + _, err := l.MysqlDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status log.Printf("Failed to update pay_resp_msg for queue_id %s: %v", msg.QueueId, err) - } else if count != 1 { - // 如果更新的行数不是 1,日志中记录详细信息 - log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) } + // else if count != 1 { + // // 如果更新的行数不是 1,日志中记录详细信息 + // log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + // } }() go l.asyncSendMsgToRmq(msg, 3, 5*time.Second) default: @@ -408,7 +468,7 @@ func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) { go func() { str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, fee, status) VALUES (?,?,?,?,?,?,?,?)" params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Status} - err := l.SqliteDB.Insert(str, params) + _, err := l.MysqlDB.Insert(str, [][]any{params}) if err != nil { log.Println(err) } @@ -431,14 +491,15 @@ func (l *ListenServer) handleChainRemove_resp(msg message.RemoveListenMsg_resp) // 更新数据库 str := "UPDATE remove_resp_msg SET status = ? WHERE queue_id = ?" params := []any{msg.Status, msg.QueueId} - count, err := l.SqliteDB.Update(str, params) + _, err := l.MysqlDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) - } else if count != 1 { - // 如果更新的行数不是 1,日志中记录详细信息 - log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) } + // else if count != 1 { + // // 如果更新的行数不是 1,日志中记录详细信息 + // log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + // } }() // 异步发送消息到 RMQ diff --git a/internal/logger/transaction_logger.go b/internal/logger/transaction_logger.go index c027f68..1afce07 100644 --- a/internal/logger/transaction_logger.go +++ b/internal/logger/transaction_logger.go @@ -276,6 +276,196 @@ func LogETHNode(msg string) { } } +// =============================== RMQ <-> Listen 通信日志 =============================== + +// LogRmqToListenTopupReq 记录 RMQ -> Listen 充值请求 +// 使用 address 作为文件名 +func LogRmqToListenTopupReq(queueId, chain, symbol, address string, timestamp uint64) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(address) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + // TopupReq 没有 fromAddress,使用 "-" 代替,toAddress 是 address,amount 为 0 + content := fmt.Sprintf("[TopupReq]: %s--%s-%s-%s-0", + t, address, chain, symbol) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogRmqToListenWithdrawReq 记录 RMQ -> Listen 提现请求 +// 使用 fromAddress 作为文件名 +func LogRmqToListenWithdrawReq(queueId, chain, symbol, fromAddress, toAddress string, amount, fee float64, timestamp uint64) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(fromAddress) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + content := fmt.Sprintf("[WithdrawReq]: %s-%s-%s-%s-%s-%.6f", + t, fromAddress, toAddress, chain, symbol, amount) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogRmqToListenPayReq 记录 RMQ -> Listen 支付请求 +// 使用 fromAddress 作为文件名 +func LogRmqToListenPayReq(queueId, chain, symbol, fromAddress, toAddress string, amount, fee float64, timestamp uint64) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(fromAddress) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + content := fmt.Sprintf("[PayReq]: %s-%s-%s-%s-%s-%.6f", + t, fromAddress, toAddress, chain, symbol, amount) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogRmqToListenRemoveReq 记录 RMQ -> Listen 移除监听请求 +// 使用 address 作为文件名 +func LogRmqToListenRemoveReq(queueId string, msgType int, chain, symbol, address string, timestamp uint64) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(address) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + // RemoveReq 没有 fromAddress,toAddress 是 address,amount 为 0 + content := fmt.Sprintf("[RemoveReq]: %s--%s-%s-%s-0", + t, address, chain, symbol) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogListenToRmqTopupResp 记录 Listen -> RMQ 充值响应 +// 使用 address 作为文件名 +func LogListenToRmqTopupResp(queueId, chain, symbol, address, fromAddress, txHash string, amount float64, blockHeight uint64, status int) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(address) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + // TopupResp 中 address 是目标地址(toAddress),fromAddress 是来源地址 + content := fmt.Sprintf("[TopupResp]: %s-%s-%s-%s-%s-%.6f", + t, fromAddress, address, chain, symbol, amount) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogListenToRmqWithdrawResp 记录 Listen -> RMQ 提现响应 +// 使用 fromAddress 作为文件名 +func LogListenToRmqWithdrawResp(queueId, chain, symbol, fromAddress, toAddress, txHash string, amount, fee float64, blockHeight uint64, status int) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(fromAddress) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + content := fmt.Sprintf("[WithdrawResp]: %s-%s-%s-%s-%s-%.6f", + t, fromAddress, toAddress, chain, symbol, amount) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogListenToRmqPayResp 记录 Listen -> RMQ 支付响应 +// 使用 fromAddress 作为文件名 +func LogListenToRmqPayResp(queueId, chain, symbol, fromAddress, toAddress, txHash string, amount, fee float64, blockHeight uint64, status int) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(fromAddress) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + content := fmt.Sprintf("[PayResp]: %s-%s-%s-%s-%s-%.6f", + t, fromAddress, toAddress, chain, symbol, amount) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + +// LogListenToRmqRemoveResp 记录 Listen -> RMQ 移除监听响应 +// 使用 address 作为文件名 +func LogListenToRmqRemoveResp(queueId string, msgType int, chain, symbol, address string, status int) { + if txLogger == nil { + return + } + + lf, err := txLogger.getOrCreateLogFile(address) + if err != nil { + fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) + return + } + + t := time.Now().Format("2006-01-02 15:04:05") + // 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount + // RemoveResp 没有 fromAddress,toAddress 是 address,amount 为 0 + content := fmt.Sprintf("[RemoveResp]: %s--%s-%s-%s-0", + t, address, chain, symbol) + + if err := lf.write(content); err != nil { + fmt.Printf("⚠️ 写入日志失败: %v\n", err) + } +} + // Close 关闭所有日志文件 func CloseTransactionLogger() { if txLogger == nil { diff --git a/internal/msg/msg.go b/internal/msg/msg.go index 9fe8728..014c887 100644 --- a/internal/msg/msg.go +++ b/internal/msg/msg.go @@ -127,7 +127,7 @@ type RemoveListenMsg_req struct { Address string `json:"address"` Timestamp uint64 `json:"timestamp"` Sign string `json:"sign"` - Stauts int `json:"status,omitempty"` + Status int `json:"status,omitempty"` } // 返回收到的删除监听地址消息 diff --git a/internal/server.go b/internal/server.go index e265761..081ae96 100644 --- a/internal/server.go +++ b/internal/server.go @@ -87,6 +87,8 @@ func (s *ServerCtx) handleTopupMsg() { } return } + // 记录 RMQ -> Listen 充值请求 + logger.LogRmqToListenTopupReq(msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp) s.messageServer.ChFromRmqServer <- msg } } @@ -111,6 +113,8 @@ func (s *ServerCtx) handleWithdrawMsg() { return } + // 记录 RMQ -> Listen 提现请求 + logger.LogRmqToListenWithdrawReq(msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Fee, msg.Timestamp) s.messageServer.ChFromRmqServer <- msg } @@ -133,6 +137,8 @@ func (s *ServerCtx) handlePayMsg() { return } + // 记录 RMQ -> Listen 支付请求 + logger.LogRmqToListenPayReq(msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Fee, msg.Timestamp) s.messageServer.ChFromRmqServer <- msg } } @@ -156,6 +162,8 @@ func (s *ServerCtx) handleRemoveMsg() { return } + // 记录 RMQ -> Listen 移除监听请求 + logger.LogRmqToListenRemoveReq(msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp) s.messageServer.ChFromRmqServer <- msg } } @@ -165,6 +173,8 @@ func (s *ServerCtx) handleRespMsg() { switch v := msg.(type) { case message.TopupMsg_resp: log.Printf("📨[充值响应]:QueueID=%s, Address=%s, Chain=%s, Symbol=%s, TxHash=%s, Status=%d, Amount=%f", v.QueueId, v.Address, v.Chain, v.Symbol, v.TxHash, v.Status, v.Amount) + // 记录 Listen -> RMQ 充值响应 + logger.LogListenToRmqTopupResp(v.QueueId, v.Chain, v.Symbol, v.Address, v.FromAddress, v.TxHash, v.Amount, v.BlockHeight, v.Status) err := s.rmqServer.PublishTopupResp(v) if err != nil { log.Printf("❌ 发送充值响应失败: %v", err) @@ -172,6 +182,8 @@ func (s *ServerCtx) handleRespMsg() { } case message.WithdrawMsg_resp: log.Printf("📨[提现响应]:QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, ToAddress=%s, TxHash=%s, Status=%d, Amount=%f", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.ToAddress, v.TxHash, v.Status, v.Amount) + // 记录 Listen -> RMQ 提现响应 + logger.LogListenToRmqWithdrawResp(v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.ToAddress, v.TxHash, v.Amount, v.Fee, v.BlockHeight, v.Status) err := s.rmqServer.PublishWithdrawResp(v) if err != nil { log.Printf("❌ 发送提现响应失败: %v", err) @@ -179,6 +191,8 @@ func (s *ServerCtx) handleRespMsg() { } case message.PayMsg_resp: log.Printf("📨[提现响应]:QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, Status=%d", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.Status) + // 记录 Listen -> RMQ 支付响应 + logger.LogListenToRmqPayResp(v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.ToAddress, v.TxHash, v.Amount, v.Fee, v.BlockHeight, v.Status) err := s.rmqServer.PublishPayResp(v) if err != nil { log.Printf("❌ 发送支付响应失败: %v", err) @@ -186,6 +200,8 @@ func (s *ServerCtx) handleRespMsg() { } case message.RemoveListenMsg_resp: log.Printf("📨[充值响应]:QueueID=%s, Address=%s, Chain=%s, Symbol=%s,Status=%d", v.QueueId, v.Address, v.Chain, v.Symbol, v.Status) + // 记录 Listen -> RMQ 移除监听响应 + logger.LogListenToRmqRemoveResp(v.QueueId, v.MsgType, v.Chain, v.Symbol, v.Address, v.Status) err := s.rmqServer.PublishRemoveResp(v) if err != nil { log.Printf("❌ 发送移除监听响应失败: %v", err) diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 23b0013..9fd7163 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -78,9 +78,24 @@ func Float64ToBigInt(symbol string, amount float64) *big.Int { log.Printf("don`t support symbol: %s", symbol) return nil } - bigAmount := new(big.Int) - bigAmount.SetInt64(int64(amount * scale)) - return bigAmount + + // 检查 amount 是否为负数 + if amount < 0 { + log.Printf("Warning: negative amount for symbol %s: %f", symbol, amount) + amount = 0 // 可以选择将负数设置为 0 或返回错误 + } + + // 将 amount 转换为 big.Float 来避免溢出 + bigAmount := new(big.Float) + bigAmount.SetFloat64(amount) + + // 乘以 scale(小数位数),避免精度丢失 + bigAmount = bigAmount.Mul(bigAmount, big.NewFloat(scale)) + + // 将 big.Float 转换为 big.Int(取整) + intAmount, _ := bigAmount.Int(nil) + + return intAmount } func BigIntToFloat64(symbol string, amount *big.Int) float64 { diff --git a/public/eth_mysql.sql b/public/eth_mysql.sql index a64a2ee..db3234a 100644 --- a/public/eth_mysql.sql +++ b/public/eth_mysql.sql @@ -3,17 +3,19 @@ CREATE TABLE IF NOT EXISTS ETH_wallets ( queue_id VARCHAR(255) NOT NULL, timestamp BIGINT NOT NULL, sign VARCHAR(255) NOT NULL, - status TINYINT DEFAULT 0 -- 0未在监听 1正在监听 + status TINYINT DEFAULT 0 ); +CREATE INDEX idx_queue_id ON ETH_wallets (queue_id); + CREATE TABLE IF NOT EXISTS ETH_balances ( address VARCHAR(255) NOT NULL, symbol VARCHAR(32) DEFAULT "ETH", used_gas DECIMAL(40,16) DEFAULT 0, balance DECIMAL(40,16) DEFAULT 0, - success_tx_hash TEXT DEFAULT NULL, -- 使用,隔开 - failed_tx_hash TEXT DEFAULT NULL, -- 使用,隔开 - PRIMARY KEY (address), -- 钱包地址唯一 + success_tx_hash TEXT DEFAULT NULL, + failed_tx_hash TEXT DEFAULT NULL, + PRIMARY KEY (address), FOREIGN KEY (address) REFERENCES ETH_wallets (address) ON DELETE CASCADE ); @@ -22,15 +24,15 @@ CREATE TABLE IF NOT EXISTS USDT_balances ( symbol VARCHAR(32) DEFAULT "USDT", freeze_num DECIMAL(40,16) DEFAULT 0, balance DECIMAL(40,16) DEFAULT 0, - success_tx_hash TEXT DEFAULT NULL, -- 使用,隔开 - failed_tx_hash TEXT DEFAULT NULL, -- 使用,隔开 - PRIMARY KEY (address), -- 钱包地址唯一 + success_tx_hash TEXT DEFAULT NULL, + failed_tx_hash TEXT DEFAULT NULL, + PRIMARY KEY (address), FOREIGN KEY (address) REFERENCES ETH_wallets (address) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS ETH_unconfirmed_tx ( - queue_id VARCHAR(255) NOT NULL, -- 关联的msg queue_id - tx_type TINYINT NOT NULL, -- 0充值,1提现,2支付 + queue_id VARCHAR(255) NOT NULL, + tx_type TINYINT NOT NULL, chain VARCHAR(32) DEFAULT "ETH", symbol VARCHAR(32), from_addr VARCHAR(255), @@ -38,6 +40,6 @@ CREATE TABLE IF NOT EXISTS ETH_unconfirmed_tx ( tx_hash VARCHAR(255), height BIGINT, amount DECIMAL(40,16), - status TINYINT DEFAULT 2, -- 0充值失败,1充值成功,2充值待确认 + status TINYINT DEFAULT 2, FOREIGN KEY (queue_id) REFERENCES ETH_wallets (queue_id) ON DELETE CASCADE -); +); \ No newline at end of file diff --git a/public/msg_mysql.sql b/public/msg_mysql.sql index 20a1dd5..c430ef8 100644 --- a/public/msg_mysql.sql +++ b/public/msg_mysql.sql @@ -1,4 +1,3 @@ --- mysql CREATE TABLE IF NOT EXISTS topup_req_msg ( queue_id VARCHAR(255) NOT NULL, chain VARCHAR(32) NOT NULL, @@ -19,7 +18,7 @@ CREATE TABLE IF NOT EXISTS topup_resp_msg ( amount DECIMAL(30,16) NOT NULL, tx_hash VARCHAR(255) DEFAULT NULL, height BIGINT DEFAULT NULL, - status TINYINT DEFAULT 5, --遵循constant模块的定义 + status TINYINT DEFAULT 5, FOREIGN KEY (queue_id) REFERENCES topup_req_msg(queue_id) ); @@ -33,7 +32,7 @@ CREATE TABLE IF NOT EXISTS withdraw_req_msg ( fee DECIMAL(30,16) NOT NULL, timestamp BIGINT, sign VARCHAR(255), - status TINYINT DEFAULT 5, --遵循constant模块的定义 + status TINYINT DEFAULT 5, PRIMARY KEY (queue_id) ); @@ -47,7 +46,7 @@ CREATE TABLE IF NOT EXISTS withdraw_resp_msg ( amount DECIMAL(30,16) DEFAULT NULL, fee DECIMAL(30,16) DEFAULT NULL, height BIGINT DEFAULT NULL, - status TINYINT DEFAULT 5, --遵循constant模块的定义 + status TINYINT DEFAULT 5, FOREIGN KEY (queue_id) REFERENCES withdraw_req_msg(queue_id) ); @@ -61,7 +60,7 @@ CREATE TABLE IF NOT EXISTS pay_req_msg ( fee DECIMAL(30,16) NOT NULL, timestamp BIGINT, sign VARCHAR(255), - status TINYINT DEFAULT 5, --遵循constant模块的定义 + status TINYINT DEFAULT 5, PRIMARY KEY (queue_id) ); @@ -75,22 +74,10 @@ CREATE TABLE IF NOT EXISTS pay_resp_msg ( amount DECIMAL(30,16) DEFAULT NULL, fee DECIMAL(30,16) DEFAULT NULL, height BIGINT DEFAULT NULL, - status TINYINT DEFAULT 5, --遵循constant模块的定义 + status TINYINT DEFAULT 5, FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) ); --- pay_msg的交易 --- CREATE TABLE IF NOT EXISTS pay_msg_tx ( --- queue_id VARCHAR(255) NOT NULL, --- tx_hash VARCHAR(255) DEFAULT NULL, --- to_addr VARCHAR(255) NOT NULL, --- amount DECIMAL(30,16) DEFAULT NULL, --- fee DECIMAL(30,16), --- height BIGINT DEFAULT 0, --- status TINYINT DEFAULT 5, --遵循constant模块的定义 --- FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) --- ); - CREATE TABLE IF NOT EXISTS remove_req_msg( queue_id VARCHAR(255) NOT NULL, msg_type TINYINT NOT NULL, diff --git a/流程.txt b/流程.txt index dd25690..c96b104 100644 --- a/流程.txt +++ b/流程.txt @@ -13,4 +13,10 @@ node server listen -> 新区块产生时读取当前listen server的消息 -> -> 消息中的from、to、amount = 区块交易中的from、to、amount(提现/充值) -> 返回消息 -> listen -> 修改数据库状态 node server confirm -> 新区块产生同时会读取当前unconfirmtxs数据 -> 对比每个交易的高度 -> 符合确认条件 -> 修改钱包数据 - -> 返回消息 -> listen -> 修改相关数据 -> 返回rmq -> 发出消息 \ No newline at end of file + -> 返回消息 -> listen -> 修改相关数据 -> 返回rmq -> 发出消息 + + + + + +确认后的状态修改(余额增减、hash录入等) \ No newline at end of file