add log-system, bug fixed

This commit is contained in:
lzx
2025-11-18 11:10:16 +08:00
parent ac22db02f3
commit 74d9a114c0
13 changed files with 861 additions and 186 deletions

View File

@@ -224,6 +224,52 @@ M2Pool Payment System v2 是一个基于以太坊区块链的**分布式支付
#### 9. Logger (`internal/logger/`)
- **transaction_logger.go**:交易日志记录
- 记录所有交易操作的详细日志
- 记录 RMQ 与 Listen 之间的所有通信消息
- 按地址分文件存储,自动日志轮转和压缩
##### 日志分类
**① 交易日志(按地址分文件)**
- 充值日志:按 `address` 命名文件
- 提现日志:按 `fromAddress` 命名文件
- 支付日志:按 `fromAddress` 命名文件
- ETH 节点日志:`ethnode.log`
**② RMQ ↔ Listen 通信日志(按地址分文件)**
- 充值请求/响应:按 `address` 命名文件
- 提现请求/响应:按 `fromAddress` 命名文件
- 支付请求/响应:按 `fromAddress` 命名文件
- 移除监听请求/响应:按 `address` 命名文件
##### 日志格式
**RMQ ↔ Listen 通信日志格式**`[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount`
- **TopupReq**: `[TopupReq]: 2006-01-02 15:04:05--0xabc...-ETH-USDT-0`
- **WithdrawReq**: `[WithdrawReq]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-100.500000`
- **PayReq**: `[PayReq]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-50.250000`
- **TopupResp**: `[TopupResp]: 2006-01-02 15:04:05-0xdef...-0xabc...-ETH-USDT-200.000000`
- **WithdrawResp**: `[WithdrawResp]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-100.500000`
- **PayResp**: `[PayResp]: 2006-01-02 15:04:05-0x123...-0xabc...-ETH-USDT-50.250000`
##### 日志轮转机制
- 单文件超过 **1MB** 自动压缩为 `.gz` 格式
- 后台异步压缩,不影响性能
- 压缩后的文件名格式:`{address}_{timestamp}.log.gz`
- 自动创建新的日志文件继续写入
##### 日志存储位置
所有日志文件存储在 `logs/` 目录下:
```
logs/
├── 0x123...abc.log # 地址 0x123...abc 的日志
├── 0x456...def.log # 地址 0x456...def 的日志
├── 0x123...abc_20240102_120000.log.gz # 压缩的历史日志
├── ethnode.log # ETH 节点日志
└── ...
```
### 项目结构

View File

@@ -145,10 +145,10 @@ func NewETHNode(cfg message.Config, decodeKey string, l *listen.ListenServer) (*
Cancel: cancel,
}
// 初始化表
err = ethnode.initTables()
if err != nil {
return nil, err
}
// err = ethnode.MysqlDB.ExecuteSQLFile("../public/eth_mysql.sql")
// if err != nil {
// log.Fatalf("ETH-Chain初始化数据库表失败%v", err)
// }
// 更新网络公共数据和加载钱包
height, err := ethnode.getHeight()
if err != nil {

View File

@@ -10,6 +10,7 @@ import (
"m2pool-payment/internal/utils"
"math/big"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -271,6 +272,7 @@ func (e *ETHNode) checkBalance(symbol, address string, target_amount_eth, target
e.Wallets[address].mu.Lock()
defer e.Wallets[address].mu.Unlock()
if wallet, ok := e.Wallets[address]; ok {
log.Printf("需要的USDT%d钱包的USDT%d", target_amount_usdt, e.Wallets[address].usdt_balance.balance)
switch symbol {
case "ETH":
// ETH余额不足支付转账金额+GAS费用
@@ -300,29 +302,33 @@ func (e *ETHNode) loadWallets() error {
SELECT
ew.address, ew.queue_id, ew.timestamp, ew.sign, ew.status,
eb.used_gas AS eth_used_gas, eb.balance AS eth_balance,
eb.success_tx_hash AS eth_successed_tx_hash, eb.failed_tx_hash AS eth_filed_tx_hash,
eb.success_tx_hash AS eth_successed_tx_hash, eb.failed_tx_hash AS eth_failed_tx_hash,
ub.freeze_num AS usdt_freeze_num, ub.balance AS usdt_balance,
ub.success_tx_hash AS usdt_successed_tx_hash, ub.failed_tx_hash AS usdt_filed_tx_hash
ub.success_tx_hash AS usdt_successed_tx_hash, ub.failed_tx_hash AS usdt_failed_tx_hash
FROM ETH_wallets ew
LEFT JOIN ETH_balances eb ON ew.address = eb.address
LEFT JOIN USDT_balances ub ON ew.address = ub.address
WHERE ew.status = ?`
rows, err := e.SqliteDB.DB.Query(query, 1)
// 执行查询
rows, err := e.MysqlDB.Query(query, 1)
if err != nil {
return fmt.Errorf("failed to get wallets: %v", err)
}
defer rows.Close()
// 解析字段为 big.Int 类型
parseBigInt := func(val sql.NullString) *big.Int {
if val.Valid && strings.TrimSpace(val.String) != "" {
if bi, ok := new(big.Int).SetString(strings.TrimSpace(val.String), 10); ok {
return bi
}
}
return big.NewInt(0)
log.Printf("Invalid or empty string for big.Int: %s", val.String) // 添加调试日志
return big.NewInt(0) // 如果无法转换,返回 0
}
// 解析字符串为交易哈希列表
parseHashList := func(val sql.NullString) []string {
if !val.Valid || val.String == "" {
return []string{}
@@ -338,8 +344,11 @@ func (e *ETHNode) loadWallets() error {
return res
}
// 用来存储钱包信息
wallets := make(map[string]*Wallets)
addresses := []string{}
// 逐行处理查询结果
for rows.Next() {
var (
addrStr, queueID, sign sql.NullString
@@ -351,6 +360,7 @@ func (e *ETHNode) loadWallets() error {
usdtSuccess, usdtFailed sql.NullString
)
// 扫描查询结果到变量
if err := rows.Scan(
&addrStr, &queueID, &timestamp, &sign, &status,
&ethUsedGas, &ethBalance,
@@ -361,12 +371,14 @@ func (e *ETHNode) loadWallets() error {
return fmt.Errorf("failed to scan row: %v", err)
}
// 如果地址字段为空,则跳过该行
if !addrStr.Valid {
continue
}
addr := strings.ToLower(addrStr.String)
// 创建钱包对象并填充数据
wallet := &Wallets{
address: addr,
queueId: queueID.String,
@@ -385,80 +397,105 @@ func (e *ETHNode) loadWallets() error {
}(),
}
// 解析 ETH 余额和信息
ethBalanceStr := strings.TrimSpace(ethBalance.String)
log.Printf("Parsed ETH balance: %s for address: %s", ethBalanceStr, addrStr.String) // 调试日志
ethBalanceValue, err := strconv.ParseFloat(ethBalanceStr, 64)
if err != nil {
log.Printf("Failed to parse ETH balance: %v, address: %s", err, addrStr.String)
ethBalanceValue = 0 // 给一个默认值
}
wallet.eth_balance = &eth_balance{
symbol: "ETH",
used_gas: parseBigInt(ethUsedGas),
balance: parseBigInt(ethBalance),
symbol: "ETH",
used_gas: parseBigInt(ethUsedGas),
// balance: big.NewInt(int64(ethBalanceValue)), // 使用 big.Int 存储数值
balance: utils.Float64ToBigInt("ETH", ethBalanceValue),
successed_tx_hash: parseHashList(ethSuccess),
failed_tx_hash: parseHashList(ethFailed),
}
// 解析 USDT 余额和信息
usdtBalanceStr := strings.TrimSpace(usdtBalance.String)
log.Printf("Parsed USDT balance: %s for address: %s", usdtBalanceStr, addrStr.String) // 调试日志
usdtBalanceValue, err := strconv.ParseFloat(usdtBalanceStr, 64)
if err != nil {
log.Printf("Failed to parse USDT balance: %v, address: %s", err, addrStr.String)
usdtBalanceValue = 0 // 给一个默认值
}
wallet.usdt_balance = &usdt_balance{
symbol: "USDT",
freeze_num: parseBigInt(usdtFreeze),
balance: parseBigInt(usdtBalance),
balance: utils.Float64ToBigInt("USDT", usdtBalanceValue), // 使用 big.Int 存储数值
successed_tx_hash: parseHashList(usdtSuccess),
failed_tx_hash: parseHashList(usdtFailed),
}
// 添加钱包地址到列表并将钱包信息存储到 map 中
addresses = append(addresses, addr)
wallets[addr] = wallet
}
log.Printf("ETH钱包加载成功%v", addresses)
// 检查是否有查询错误
if err := rows.Err(); err != nil {
return fmt.Errorf("error occurred while iterating rows: %v", err)
}
// 打印已加载的地址
log.Printf("ETH钱包加载成功%v", addresses)
// 如果有地址,获取钱包的私钥
if len(addresses) > 0 {
pks, err := e.getAddressesPks(addresses)
if err != nil {
return fmt.Errorf("inital balance private key error: %v", err)
return fmt.Errorf("initial balance private key error: %v", err)
}
e.mu.Lock()
e.Wallets = wallets
// 将私钥绑定到钱包对象中
for address, pk := range pks {
e.Wallets[address].pk = pk
}
e.mu.Unlock()
}
// for addr, balance := range e.Wallets {
// log.Printf("钱包:%s, wallets: %v", addr, balance)
// }
// 打印每个钱包的余额
for addr, balance := range e.Wallets {
log.Printf("钱包:%s, ETH余额: %s, USDT余额: %s", addr, balance.eth_balance.balance.String(), balance.usdt_balance.balance.String())
}
return nil
}
func (e *ETHNode) loadUnConfirmedTxs() error {
query_str := "SELECT queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount, status FROM eth_unconfirmed_tx WHERE status = ?"
data, err := e.SqliteDB.Query_(query_str, 2)
// 查询语句
query_str := "SELECT queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount, status FROM ETH_unconfirmed_tx WHERE status = ?"
// 执行查询
rows, err := e.MysqlDB.Query(query_str, 2)
if err != nil {
return fmt.Errorf("failed to get columns: %v", err)
return fmt.Errorf("failed to query unconfirmed transactions: %v", err)
}
defer rows.Close() // 确保查询结束后关闭 rows
for _, row := range data {
// 提取各个字段并确保正确转换为目标类型
queueId, ok := row["queue_id"].(string)
if !ok {
return fmt.Errorf("invalid type for queue_id, expected string but got %T", row["queue_id"])
// 遍历查询结果
for rows.Next() {
var queueId, chain, symbol, fromAddr, toAddr, txHash, amount string
var txType, height, status int
// 读取当前行数据
if err := rows.Scan(&queueId, &txType, &chain, &symbol, &fromAddr, &toAddr, &txHash, &height, &amount, &status); err != nil {
return fmt.Errorf("failed to scan row: %v", err)
}
// 你可以继续提取其他字段并做类似类型转换
txType, _ := row["tx_type"].(int) // 假设 tx_type 是 int 类型
chain, _ := row["chain"].(string)
symbol, _ := row["symbol"].(string)
fromAddr, _ := row["from_addr"].(string)
toAddr, _ := row["to_addr"].(string)
tx_hash, _ := row["tx_hash"].(string)
height, _ := row["height"].(int) // 假设 height 是 int 类型
amount, _ := row["amount"].(string) // amount 是字符串类型
status, _ := row["status"].(int) // status 是 int 类型
big_amount := new(big.Int)
big_Amount, ok := big_amount.SetString(amount, 10)
if !ok {
return fmt.Errorf("amount to bigInt error: %v", err)
// 将 amount 转换为 big.Int
bigAmount := new(big.Int)
bigAmountParsed, success := bigAmount.SetString(amount, 10)
if !success {
return fmt.Errorf("failed to convert amount '%s' to big.Int", amount)
}
// 锁定并填充数据
e.UnConfirmedTxs.mu.Lock()
// 填充 Transaction 结构体
e.UnConfirmedTxs.Transactions[queueId] = message.Transaction{
QueueId: queueId,
TxType: txType,
@@ -466,13 +503,19 @@ func (e *ETHNode) loadUnConfirmedTxs() error {
Symbol: symbol,
From: fromAddr,
To: toAddr,
TxHash: tx_hash,
TxHash: txHash,
Height: uint64(height),
Amount: big_Amount,
Amount: bigAmountParsed, // 使用转换后的 big.Int
Status: status,
}
e.UnConfirmedTxs.mu.Unlock()
}
// 检查是否存在查询错误
if err := rows.Err(); err != nil {
return fmt.Errorf("error during row iteration: %v", err)
}
return nil
}
@@ -762,7 +805,16 @@ func (e *ETHNode) handleETHEvent(header *types.Header) {
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
go func() {
str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, 0, "ETH", "ETH", fromAddr, toAddr, txHash, height, amount}
_, err := e.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err)
return
}
}()
continue
}
// fromAddr和监听钱包一致表示(提现/支付)
@@ -807,6 +859,16 @@ func (e *ETHNode) handleETHEvent(header *types.Header) {
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
go func() {
str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{v.QueueId, 1, "ETH", "ETH", fromAddr, toAddr, txHash, height, amount}
_, err := e.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err)
return
}
}()
continue
}
case message.PayMsg_req:
@@ -838,6 +900,16 @@ func (e *ETHNode) handleETHEvent(header *types.Header) {
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
go func() {
str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{v.QueueId, 2, "ETH", "ETH", fromAddr, toAddr, txHash, height, amount}
_, err := e.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err)
return
}
}()
continue
}
default:
@@ -931,7 +1003,16 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) {
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
go func() {
str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, 0, "ETH", "USDT", fromAddr, toAddr, txHash, height, value_float}
_, err := e.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err)
return
}
}()
return
}
// fromAddr和监听钱包一致表示(提现/支付)
@@ -976,6 +1057,16 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) {
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
go func() {
str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{v.QueueId, 1, "ETH", "USDT", fromAddr, toAddr, txHash, height, value_float}
_, err := e.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err)
return
}
}()
return
}
case message.PayMsg_req:
@@ -1007,6 +1098,16 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) {
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
go func() {
str := "INSERT INTO ETH_unconfirmed_tx(queue_id, tx_type, chain, symbol, from_addr, to_addr, tx_hash, height, amount) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{v.QueueId, 2, "ETH", "USDT", fromAddr, toAddr, txHash, height, value_float}
_, err := e.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Fatalf("INSERT ETH_unconfirmed_tx table error: %v", err)
return
}
}()
return
}
default:
@@ -1014,7 +1115,13 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) {
}
}
var tableMap = map[string]string{
"ETH": "ETH_balances",
"USDT": "USDT_balances",
}
func (e *ETHNode) confirm() {
e.mu.Lock()
unconfirmedTxs := e.UnConfirmedTxs
now_height := e.NetInfo.Height
@@ -1022,6 +1129,7 @@ func (e *ETHNode) confirm() {
var responses []any
e.UnConfirmedTxs.mu.Lock()
for txHash, tx := range unconfirmedTxs.Transactions {
// 高度成熟:当前高度 >= 交易高度 + 确认高度
if now_height >= tx.Height+e.ConfirmHeight {
check_result, actual_gas, err := e.checkTransaction(txHash)
@@ -1039,6 +1147,7 @@ func (e *ETHNode) confirm() {
if ee != nil {
log.Printf("Query Message error: %v, queue_id: %s, tx_type: %d", ee, tx.QueueId, tx.TxType)
status = constant.STATUS_FAILED
// 如果找不到消息,仍然需要处理交易失败的情况
msg = nil
} else {
@@ -1061,10 +1170,7 @@ func (e *ETHNode) confirm() {
delete(e.UnConfirmedTxs.Transactions, txHash)
continue
}
var tableMap = map[string]string{
"ETH": "ETH_balances",
"USDT": "USDT_balances",
}
switch v := msg.(type) {
case message.TopupMsg_req:
if status == constant.STATUS_SUCCESS {
@@ -1091,16 +1197,31 @@ func (e *ETHNode) confirm() {
TxHash: txHash,
BlockHeight: tx.Height,
}
// 修改钱包表
go func() {
str := "UPDATE " + tableMap[tx.Symbol] + " SET success_tx_hash = success_tx_hash || ? WHERE address = ?"
params := []any{txHash + ",", v.Address}
count, err := e.SqliteDB.Update(str, params)
var str string
var params []any
if status == constant.STATUS_SUCCESS {
str = "UPDATE " + tableMap[tx.Symbol] + " SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance + ? WHERE address = ?"
params = []any{txHash + ",", float_amount, v.Address}
} else {
str = "UPDATE " + tableMap[tx.Symbol] + " SET failed_tx_hash = CONCAT(failed_tx_hash, ?) WHERE address = ?"
params = []any{txHash + ",", v.Address}
}
_, err := e.MysqlDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", v.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", v.QueueId, count)
log.Printf("Failed to update success_tx_hash/failed_tx_hash for queue_id %s: %v", v.QueueId, err)
}
}()
// 修改待确认交易表
go func() {
str := "UPDATE ETH_unconfirmed_tx SET status = ? WHERE tx_hash = ?"
params := []any{status, txHash}
_, err := e.MysqlDB.Update(str, params)
if err != nil {
log.Printf("Failed to update ETH_unconfirmed_tx: %v", err)
}
}()
responses = append(responses, response) // 将消息提交至responses
@@ -1134,6 +1255,45 @@ func (e *ETHNode) confirm() {
ToAddress: tx.To,
BlockHeight: tx.Height,
}
// 修改钱包表
go func() {
var str, str1 string
var params, params1 []any
txHashWithComma := txHash + ","
if status == constant.STATUS_SUCCESS {
if tableMap[tx.Symbol] == "ETH_balances" {
str = "UPDATE ETH_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, used_gas = used_gas + ? WHERE address = ?"
params = []any{txHashWithComma, float_amount, utils.BigIntETHToFloat64(actual_gas), v.FromAddress}
str1 = ""
params1 = []any{}
} else {
str = "UPDATE USDT_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, freeze_num = freeze_num + ? WHERE address = ?"
params = []any{txHashWithComma, float_amount, v.Fee, v.FromAddress}
str1 = "UPDATE ETH_balances SET used_gas = used_gas + ? WHERE address = ?"
params1 = []any{utils.BigIntETHToFloat64(actual_gas), v.FromAddress}
}
} else {
str = "UPDATE " + tableMap[tx.Symbol] + " SET failed_tx_hash = CONCAT(failed_tx_hash, ?) WHERE address = ?"
params = []any{txHashWithComma, v.FromAddress}
str1 = ""
params1 = []any{}
}
err := e.MysqlDB.ExecuteTransactions([]string{str, str1}, [][]any{params, params1})
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update success_tx_hash/failed_tx_hash for queue_id %s: %v", v.QueueId, err)
}
}()
// 修改待确认交易表
go func() {
str := "UPDATE ETH_unconfirmed_tx SET status = ? WHERE tx_hash = ?"
params := []any{status, txHash}
_, err := e.MysqlDB.Update(str, params)
if err != nil {
log.Printf("Failed to update ETH_unconfirmed_tx: %v", err)
}
}()
responses = append(responses, response) // 将消息提交至responses
case message.PayMsg_req:
e.Wallets[tx.From].eth_balance.used_gas = new(big.Int).Add(e.Wallets[tx.From].eth_balance.used_gas, actual_gas)
@@ -1162,6 +1322,45 @@ func (e *ETHNode) confirm() {
ToAddress: tx.To,
BlockHeight: tx.Height,
}
// 修改钱包表
go func() {
var str, str1 string
var params, params1 []any
txHashWithComma := txHash + ","
if status == constant.STATUS_SUCCESS {
if tableMap[tx.Symbol] == "ETH_balances" {
str = "UPDATE ETH_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, used_gas = used_gas + ? WHERE address = ?"
params = []any{txHashWithComma, float_amount, utils.BigIntETHToFloat64(actual_gas), v.FromAddress}
str1 = ""
params1 = []any{}
} else {
str = "UPDATE USDT_balances SET success_tx_hash = CONCAT(success_tx_hash, ?), balance = balance - ?, freeze_num = freeze_num + ? WHERE address = ?"
params = []any{txHashWithComma, float_amount, v.Fee, v.FromAddress}
str1 = "UPDATE ETH_balances SET used_gas = used_gas + ? WHERE address = ?"
params1 = []any{utils.BigIntETHToFloat64(actual_gas), v.FromAddress}
}
} else {
str = "UPDATE " + tableMap[tx.Symbol] + " SET failed_tx_hash = CONCAT(failed_tx_hash, ?) WHERE address = ?"
params = []any{txHashWithComma, v.FromAddress}
str1 = ""
params1 = []any{}
}
err := e.MysqlDB.ExecuteTransactions([]string{str, str1}, [][]any{params, params1})
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update success_tx_hash/failed_tx_hash for queue_id %s: %v", v.QueueId, err)
}
}()
// 修改待确认交易表
go func() {
str := "UPDATE ETH_unconfirmed_tx SET status = ? WHERE tx_hash = ?"
params := []any{status, txHash}
_, err := e.MysqlDB.Update(str, params)
if err != nil {
log.Printf("Failed to update ETH_unconfirmed_tx: %v", err)
}
}()
responses = append(responses, response) // 将消息提交至responses
default:
log.Printf("未知的消息类型: %v, 跳过此交易", v)
@@ -1225,7 +1424,7 @@ func (e *ETHNode) handleListen_Topup_req(msg message.TopupMsg_req) {
params2 := []any{msg.Address}
str3 := "INSERT INTO USDT_balances (address) VALUES (?)"
params3 := []any{msg.Address}
err = e.SqliteDB.ExecuteTransactions([]string{str1, str2, str3}, [][]any{params1, params2, params3})
err = e.MysqlDB.ExecuteTransactions([]string{str1, str2, str3}, [][]any{params1, params2, params3})
if err != nil {
log.Printf("Received ListenServer Topup_req msg: insert sqlite3 db error: %v", err)
}
@@ -1269,6 +1468,7 @@ func (e *ETHNode) handleListen_Withdraw_req(msg message.WithdrawMsg_req) {
FromAddress: msg.FromAddress,
ToAddress: msg.ToAddress,
}
check_result, err := e.checkBalance(msg.Symbol, msg.FromAddress, target_amount_eth, target_amount_usdt)
// 余额校验错误,绕过转账,返回错误响应
if err != nil {
@@ -1383,9 +1583,9 @@ func (e *ETHNode) handleListen_Remove_req(msg message.RemoveListenMsg_req) {
}
str := "UPDATE ETH_wallets SET status = ? WHERE address = ?"
params := []any{0, msg.Address}
count, err := e.SqliteDB.Update(str, params)
if err != nil || count != 1 {
log.Printf("Remove address(%s) error: count(%d)", msg.Address, count)
_, err := e.MysqlDB.Update(str, params)
if err != nil {
log.Printf("Remove address(%s) error: %v", msg.Address, err)
// result_msg.Status = constant.STATUS_FAILED
}
go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second)

View File

@@ -1,9 +1,13 @@
package db
import (
"bufio"
"database/sql"
"fmt"
"io/ioutil"
"log"
message "m2pool-payment/internal/msg"
"strings"
_ "github.com/go-sql-driver/mysql"
)
@@ -34,6 +38,67 @@ func NewMySQLPool(cfg message.MysqlConfig) (*MySQLPool, error) {
return &MySQLPool{db: db}, nil
}
// splitSQLStatements 将 SQL 内容按分号分割
func splitSQLStatements(sqlContent string) []string {
// 使用 bufio 扫描文件内容
var statements []string
scanner := bufio.NewScanner(strings.NewReader(sqlContent))
var queryBuilder strings.Builder
for scanner.Scan() {
line := scanner.Text()
// 处理每一行的 SQL 语句
if strings.TrimSpace(line) == "" {
continue
}
// 如果行中包含分号,说明是完整的 SQL 语句
queryBuilder.WriteString(line)
if strings.HasSuffix(line, ";") {
statements = append(statements, queryBuilder.String())
queryBuilder.Reset() // 清空构建器以便准备下一条 SQL 语句
} else {
queryBuilder.WriteString("\n")
}
}
// 处理可能的扫描错误
if err := scanner.Err(); err != nil {
log.Fatalf("error reading .sql file: %v\n", err)
}
return statements
}
func (p *MySQLPool) ExecuteSQLFile(filePath string) error {
// 读取 SQL 文件内容
sqlContent, err := ioutil.ReadFile(filePath)
if err != nil {
return fmt.Errorf("failed to read SQL file: %v", err)
}
// 将文件内容按分号 (;) 分割成多条 SQL 语句
queries := splitSQLStatements(string(sqlContent))
// 执行每一条 SQL 语句
for _, query := range queries {
// 跳过空行或注释
if strings.TrimSpace(query) == "" || strings.HasPrefix(strings.TrimSpace(query), "--") {
continue
}
// 执行 SQL 语句
_, err := p.db.Exec(query)
if err != nil {
log.Printf("error executing query: %v\n", err)
} else {
// fmt.Println("Executed query:", query)
}
}
return nil
}
// Exec 执行 INSERT/UPDATE/DELETE
func (p *MySQLPool) Exec(query string, args ...any) (sql.Result, error) {
return p.db.Exec(query, args...)
@@ -64,6 +129,89 @@ func (p *MySQLPool) Transaction(fn func(tx *sql.Tx) error) error {
return tx.Commit()
}
// Insert 执行通用插入操作
func (p *MySQLPool) Insert(query string, values [][]any) (sql.Result, error) {
// 预处理查询
stmt, err := p.db.Prepare(query)
if err != nil {
return nil, fmt.Errorf("failed to prepare statement: %v", err)
}
defer stmt.Close()
// 执行批量插入
var result sql.Result
for _, row := range values {
result, err = stmt.Exec(row...)
if err != nil {
return nil, fmt.Errorf("failed to execute insert: %v", err)
}
}
return result, nil
}
// Delete 执行通用删除操作
// Update 执行通用更新操作
func (p *MySQLPool) Update(query string, values []any) (sql.Result, error) {
// 预处理查询
stmt, err := p.db.Prepare(query)
if err != nil {
return nil, fmt.Errorf("failed to prepare statement: %v", err)
}
defer stmt.Close()
// 执行更新操作
result, err := stmt.Exec(values...)
if err != nil {
return nil, fmt.Errorf("failed to execute update: %v", err)
}
return result, nil
}
// ExecuteTransactions 执行多条增删改操作,确保事务的原子性
func (p *MySQLPool) ExecuteTransactions(str_sqls []string, params [][]any) error {
// 检查 SQL 和参数的数量是否匹配
if len(str_sqls) != len(params) {
return fmt.Errorf("sql length != params length")
}
// 开始事务
tx, err := p.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %v", err)
}
// 确保在函数结束时提交或回滚事务
defer func() {
if err != nil {
// 发生错误时回滚事务
if rollbackErr := tx.Rollback(); rollbackErr != nil {
err = fmt.Errorf("failed to rollback transaction: %v", rollbackErr)
}
} else {
// 如果没有错误,提交事务
if commitErr := tx.Commit(); commitErr != nil {
err = fmt.Errorf("failed to commit transaction: %v", commitErr)
}
}
}()
// 执行每个 SQL 语句
for i, sql_str := range str_sqls {
// 使用事务对象 tx 来执行 SQL
_, err := tx.Exec(sql_str, params[i]...)
if err != nil {
// 如果执行失败,立即返回并且触发回滚
return fmt.Errorf("failed to execute SQL at index %d: %v", i, err)
}
}
// 如果所有 SQL 执行成功,则返回 nil
return nil
}
// Close 关闭连接池
func (p *MySQLPool) Close() error {
return p.db.Close()

View File

@@ -10,8 +10,8 @@ import (
)
type ListenServer struct {
Config message.Config
// MysqlDB *db.MySQLPool
Config message.Config
MysqlDB *db.MySQLPool
SqliteDB *db.SQLite
TopupMsgs map[string]*TopupMsgs // {"ETH": TopupMsgs{"queue_id": message.Topupmsg_req{}, ...}}
WithdrawMsgs map[string]*WithdrawMsgs
@@ -45,12 +45,12 @@ type RemoveMsgs struct {
}
func NewListenServer(cfg message.Config) *ListenServer {
// // 初始化MySQL数据库
// dbConn, err := db.NewMySQLPool(cfg.MysqlConfig["wallet"])
// if err != nil {
// log.Printf("mysql connect error: %v", err)
// return nil
// }
// 初始化MySQL数据库
dbConn, err := db.NewMySQLPool(cfg.MysqlConfig["wallet"])
if err != nil {
log.Printf("mysql connect error: %v", err)
return nil
}
// 初始化SQLite3
sqlite, err := db.NewSQLite(cfg.MsgConfig.SqlitePath)
@@ -86,8 +86,8 @@ func NewListenServer(cfg message.Config) *ListenServer {
chin[net] = make(chan any, 1000)
}
var l = &ListenServer{
Config: cfg,
// MysqlDB: dbConn,
Config: cfg,
MysqlDB: dbConn,
SqliteDB: sqlite,
TopupMsgs: topup_msgs,
WithdrawMsgs: withdraw_msgs,
@@ -98,6 +98,10 @@ func NewListenServer(cfg message.Config) *ListenServer {
ChFromRmqServer: rmq_ch_in,
ChToRmqServer: rmq_ch_out,
}
// err = l.MysqlDB.ExecuteSQLFile("../public/msg_mysql.sql")
// if err != nil {
// log.Fatalf("Listen-message初始化数据库表失败%v", err)
// }
l.loadMsg()
log.Println("✅ 消息监听处理已启动")
return l

View File

@@ -5,10 +5,12 @@ import (
"log"
"m2pool-payment/internal/constant"
message "m2pool-payment/internal/msg"
"strconv"
"time"
)
// 加载所有消息
func (l *ListenServer) loadMsg() {
topup_sql := "SELECT queue_id, chain, symbol, address, timestamp, sign, status FROM topup_req_msg WHERE status = ?" // 1正在监听
withdraw_sql := "SELECT queue_id, chain, symbol, from_addr, to_addr, amount, fee, timestamp, sign, status FROM withdraw_req_msg WHERE status = ? OR status = ?" // 2待确认5待支付
@@ -18,23 +20,30 @@ func (l *ListenServer) loadMsg() {
withdraw_params := []any{2, 5}
pay_params := []any{2, 5}
remove_params := []any{2}
// 处理充值消息
go func() {
rows, err := l.SqliteDB.Query_(topup_sql, topup_params...)
rows, err := l.MysqlDB.Query(topup_sql, topup_params...)
if err != nil {
log.Fatalf("load topup-msg error: %v", err)
return
}
// 遍历查询结果
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
address := row["address"].(string) // 假设 address 是 string 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 INTEGER 类型
defer rows.Close() // 确保在函数结束时关闭 rows
// 处理数据,例如打印输出
// 遍历查询结果
for rows.Next() {
var queueID, chain, symbol, address, sign string
var timestamp int64
var status int
// 扫描每行数据到相应的变量
err := rows.Scan(&queueID, &chain, &symbol, &address, &timestamp, &sign, &status)
if err != nil {
log.Printf("failed to scan topup row: %v", err)
continue
}
// 处理数据
l.TopupMsgs[chain].mu.RLock()
l.TopupMsgs[chain].Msgs[queueID] = message.TopupMsg_req{
QueueId: queueID,
@@ -43,33 +52,51 @@ func (l *ListenServer) loadMsg() {
Address: address,
Timestamp: uint64(timestamp),
Sign: sign,
Status: int(status),
Status: status,
}
l.TopupMsgs[chain].mu.RUnlock()
}
if err := rows.Err(); err != nil {
log.Printf("error occurred during rows iteration: %v", err)
}
log.Printf("充值历史消息load完毕")
}()
// 处理提现消息
go func() {
rows, err := l.SqliteDB.Query_(withdraw_sql, withdraw_params...)
rows, err := l.MysqlDB.Query(withdraw_sql, withdraw_params...)
if err != nil {
log.Fatalf("load withdraw-msg error: %v", err)
return
}
// 遍历查询结果
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型
toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型
amount := row["amount"].(float64) // 假设 amount 是 float64 类型
fee := row["fee"].(float64) // 假设 fee 是 float64 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 string 类型
defer rows.Close()
// 处理数据,例如打印输出
// 遍历查询结果
for rows.Next() {
var queueID, chain, symbol, fromAddr, toAddr, amountStr, feeStr, sign string
var timestamp int64
var status int
// 扫描每行数据
err := rows.Scan(&queueID, &chain, &symbol, &fromAddr, &toAddr, &amountStr, &feeStr, &timestamp, &sign, &status)
if err != nil {
log.Printf("failed to scan withdraw row: %v", err)
continue
}
// 将 amount 和 fee 转换为浮动数字
amount, err := strconv.ParseFloat(amountStr, 64)
if err != nil {
log.Printf("failed to parse amount: %v", err)
continue
}
fee, err := strconv.ParseFloat(feeStr, 64)
if err != nil {
log.Printf("failed to parse fee: %v", err)
continue
}
// 处理数据
l.WithdrawMsgs[chain].mu.RLock()
l.WithdrawMsgs[chain].Msgs[queueID] = message.WithdrawMsg_req{
QueueId: queueID,
@@ -81,33 +108,51 @@ func (l *ListenServer) loadMsg() {
Fee: fee,
Timestamp: uint64(timestamp),
Sign: sign,
Status: int(status),
Status: status,
}
l.WithdrawMsgs[chain].mu.RUnlock()
}
if err := rows.Err(); err != nil {
log.Printf("error occurred during rows iteration: %v", err)
}
log.Printf("提现历史消息load完毕")
}()
// 处理支付消息
go func() {
rows, err := l.SqliteDB.Query_(pay_sql, pay_params...)
rows, err := l.MysqlDB.Query(pay_sql, pay_params...)
if err != nil {
log.Fatalf("load pay-msg error: %v", err)
return
}
// 遍历查询结果
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型
toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型
amount := row["amount"].(float64) // 假设 amount 是 float64 类型
fee := row["fee"].(float64) // 假设 fee 是 float64 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 string 类型
defer rows.Close()
// 处理数据,例如打印输出
// 遍历查询结果
for rows.Next() {
var queueID, chain, symbol, fromAddr, toAddr, amountStr, feeStr, sign string
var timestamp int64
var status int
// 扫描每行数据
err := rows.Scan(&queueID, &chain, &symbol, &fromAddr, &toAddr, &amountStr, &feeStr, &timestamp, &sign, &status)
if err != nil {
log.Printf("failed to scan pay row: %v", err)
continue
}
// 将 amount 和 fee 转换为浮动数字
amount, err := strconv.ParseFloat(amountStr, 64)
if err != nil {
log.Printf("failed to parse amount: %v", err)
continue
}
fee, err := strconv.ParseFloat(feeStr, 64)
if err != nil {
log.Printf("failed to parse fee: %v", err)
continue
}
// 处理数据
l.PayMsgs[chain].mu.RLock()
l.PayMsgs[chain].Msgs[queueID] = message.PayMsg_req{
QueueId: queueID,
@@ -119,29 +164,37 @@ func (l *ListenServer) loadMsg() {
Fee: fee,
Timestamp: uint64(timestamp),
Sign: sign,
Status: int(status),
Status: status,
}
l.PayMsgs[chain].mu.RUnlock()
}
if err := rows.Err(); err != nil {
log.Printf("error occurred during rows iteration: %v", err)
}
log.Printf("支付历史消息load完毕")
}()
// 处理移除消息
go func() {
rows, err := l.SqliteDB.Query_(remove_sql, remove_params...)
rows, err := l.MysqlDB.Query(remove_sql, remove_params...)
if err != nil {
log.Fatalf("load remove-msg error: %v", err)
return
}
defer rows.Close()
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
msgType := row["msg_type"].(int)
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
address := row["address"].(string) // 假设 address 是 string 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 INTEGER 类型
// 遍历查询结果
for rows.Next() {
var queueID, chain, symbol, address, sign string
var timestamp int64
var status, msgType int
// 扫描每行数据
err := rows.Scan(&queueID, &msgType, &chain, &symbol, &address, &timestamp, &sign, &status)
if err != nil {
log.Printf("failed to scan remove row: %v", err)
continue
}
// 处理数据
l.RemoveMsgs[chain].mu.RLock()
@@ -153,10 +206,13 @@ func (l *ListenServer) loadMsg() {
Address: address,
Timestamp: uint64(timestamp),
Sign: sign,
Stauts: int(status),
Status: status,
}
l.RemoveMsgs[chain].mu.RUnlock()
}
if err := rows.Err(); err != nil {
log.Printf("error occurred during rows iteration: %v", err)
}
log.Printf("移除监听历史消息load完毕")
}()
}
@@ -170,7 +226,7 @@ func (l *ListenServer) handleRmqTopup_req(msg message.TopupMsg_req) {
// 写数据库
sql := "INSERT INTO topup_req_msg (queue_id, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
_, err := l.MysqlDB.Insert(sql, [][]any{params})
if err != nil {
log.Printf("Insert Topup_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.TopupMsg_resp{
@@ -196,7 +252,7 @@ func (l *ListenServer) handleRmqWithdraw_req(msg message.WithdrawMsg_req) {
// 写数据库
sql := "INSERT INTO withdraw_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
_, err := l.MysqlDB.Insert(sql, [][]any{params})
if err != nil {
log.Printf("Insert Withdraw_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.WithdrawMsg_resp{
@@ -223,7 +279,7 @@ func (l *ListenServer) handleRmqPay_req(msg message.PayMsg_req) {
// 写数据库
sql := "INSERT INTO pay_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
_, err := l.MysqlDB.Insert(sql, [][]any{params})
if err != nil {
log.Printf("Insert PayMsg_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.PayMsg_resp{
@@ -249,7 +305,7 @@ func (l *ListenServer) handleRmqRemove_req(msg message.RemoveListenMsg_req) {
sql := "INSERT INTO remove_req_msg (queue_id, msg_type, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
_, err := l.MysqlDB.Insert(sql, [][]any{params})
if err != nil {
log.Printf("Insert Remove_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.RemoveListenMsg_resp{
@@ -276,14 +332,15 @@ func (l *ListenServer) handleUpdateReqState(msg message.UpdateReqState) {
str := "UPDATE " + typeMap[msg.MsgType] + " SET status = ? WHERE queue_id = ?"
params := []any{msg.Status, msg.QueueId}
go func() {
count, err := l.SqliteDB.Update(str, params)
_, err := l.MysqlDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update update_req_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
// else if count != 1 {
// // 如果更新的行数不是 1日志中记录详细信息
// log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
// }
}()
}
@@ -294,18 +351,19 @@ func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) {
// 修改数据库
str := "UPDATE topup_resp_msg SET status = ? WHERE tx_hash = ?"
params := []any{msg.Status, msg.TxHash}
count, err := l.SqliteDB.Update(str, params)
_, err := l.MysqlDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update topup_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
// else if count != 1 {
// // 如果更新的行数不是 1日志中记录详细信息
// log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
// }
case constant.STATUS_PENDING:
str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, tx_hash, height, status) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.Address, msg.Amount, msg.TxHash, msg.BlockHeight, msg.Status}
err := l.SqliteDB.Insert(str, params)
_, err := l.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Printf("Insert Topup_resp msg error: %v", err)
}
@@ -313,7 +371,7 @@ func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) {
// 插入数据库
str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, to_addr, status) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Status}
err := l.SqliteDB.Insert(str, params)
_, err := l.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Printf("Insert Topup_resp msg error: %v", err)
}
@@ -329,7 +387,7 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) {
go func() {
str := "INSERT INTO withdraw_resp_msg (queue_id, chain, symbol, from_addr, to_addr, tx_hash, amount, fee, height, status) VALUES (?,?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.TxHash, msg.Amount, msg.Fee, msg.BlockHeight, msg.Status}
err := l.SqliteDB.Insert(str, params)
_, err := l.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Println(err)
}
@@ -342,14 +400,15 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) {
go func() {
str := "UPDATE withdraw_resp_msg SET status = ? WHERE tx_hash = ?"
params := []any{msg.Status, msg.TxHash}
count, err := l.SqliteDB.Update(str, params)
_, err := l.MysqlDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update withdraw_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
// else if count != 1 {
// // 如果更新的行数不是 1日志中记录详细信息
// log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
// }
}()
go l.asyncSendMsgToRmq(msg, 3, 5*time.Second)
default:
@@ -360,7 +419,7 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) {
go func() {
str := "INSERT INTO withdraw_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, fee, status) VALUES (?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Status}
err := l.SqliteDB.Insert(str, params)
_, err := l.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Println(err)
}
@@ -377,7 +436,7 @@ func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) {
go func() {
str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, tx_hash, amount, fee, height, status) VALUES (?,?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.TxHash, msg.Amount, msg.Fee, msg.BlockHeight, msg.Status}
err := l.SqliteDB.Insert(str, params)
_, err := l.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Println(err)
}
@@ -390,14 +449,15 @@ func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) {
go func() {
str := "UPDATE pay_resp_msg SET status = ? WHERE tx_hash = ?"
params := []any{msg.Status, msg.TxHash}
count, err := l.SqliteDB.Update(str, params)
_, err := l.MysqlDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update pay_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
// else if count != 1 {
// // 如果更新的行数不是 1日志中记录详细信息
// log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
// }
}()
go l.asyncSendMsgToRmq(msg, 3, 5*time.Second)
default:
@@ -408,7 +468,7 @@ func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) {
go func() {
str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, fee, status) VALUES (?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Status}
err := l.SqliteDB.Insert(str, params)
_, err := l.MysqlDB.Insert(str, [][]any{params})
if err != nil {
log.Println(err)
}
@@ -431,14 +491,15 @@ func (l *ListenServer) handleChainRemove_resp(msg message.RemoveListenMsg_resp)
// 更新数据库
str := "UPDATE remove_resp_msg SET status = ? WHERE queue_id = ?"
params := []any{msg.Status, msg.QueueId}
count, err := l.SqliteDB.Update(str, params)
_, err := l.MysqlDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
// else if count != 1 {
// // 如果更新的行数不是 1日志中记录详细信息
// log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
// }
}()
// 异步发送消息到 RMQ

View File

@@ -276,6 +276,196 @@ func LogETHNode(msg string) {
}
}
// =============================== RMQ <-> Listen 通信日志 ===============================
// LogRmqToListenTopupReq 记录 RMQ -> Listen 充值请求
// 使用 address 作为文件名
func LogRmqToListenTopupReq(queueId, chain, symbol, address string, timestamp uint64) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(address)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
// TopupReq 没有 fromAddress使用 "-" 代替toAddress 是 addressamount 为 0
content := fmt.Sprintf("[TopupReq]: %s--%s-%s-%s-0",
t, address, chain, symbol)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// LogRmqToListenWithdrawReq 记录 RMQ -> Listen 提现请求
// 使用 fromAddress 作为文件名
func LogRmqToListenWithdrawReq(queueId, chain, symbol, fromAddress, toAddress string, amount, fee float64, timestamp uint64) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(fromAddress)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
content := fmt.Sprintf("[WithdrawReq]: %s-%s-%s-%s-%s-%.6f",
t, fromAddress, toAddress, chain, symbol, amount)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// LogRmqToListenPayReq 记录 RMQ -> Listen 支付请求
// 使用 fromAddress 作为文件名
func LogRmqToListenPayReq(queueId, chain, symbol, fromAddress, toAddress string, amount, fee float64, timestamp uint64) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(fromAddress)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
content := fmt.Sprintf("[PayReq]: %s-%s-%s-%s-%s-%.6f",
t, fromAddress, toAddress, chain, symbol, amount)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// LogRmqToListenRemoveReq 记录 RMQ -> Listen 移除监听请求
// 使用 address 作为文件名
func LogRmqToListenRemoveReq(queueId string, msgType int, chain, symbol, address string, timestamp uint64) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(address)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
// RemoveReq 没有 fromAddresstoAddress 是 addressamount 为 0
content := fmt.Sprintf("[RemoveReq]: %s--%s-%s-%s-0",
t, address, chain, symbol)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// LogListenToRmqTopupResp 记录 Listen -> RMQ 充值响应
// 使用 address 作为文件名
func LogListenToRmqTopupResp(queueId, chain, symbol, address, fromAddress, txHash string, amount float64, blockHeight uint64, status int) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(address)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
// TopupResp 中 address 是目标地址toAddressfromAddress 是来源地址
content := fmt.Sprintf("[TopupResp]: %s-%s-%s-%s-%s-%.6f",
t, fromAddress, address, chain, symbol, amount)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// LogListenToRmqWithdrawResp 记录 Listen -> RMQ 提现响应
// 使用 fromAddress 作为文件名
func LogListenToRmqWithdrawResp(queueId, chain, symbol, fromAddress, toAddress, txHash string, amount, fee float64, blockHeight uint64, status int) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(fromAddress)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
content := fmt.Sprintf("[WithdrawResp]: %s-%s-%s-%s-%s-%.6f",
t, fromAddress, toAddress, chain, symbol, amount)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// LogListenToRmqPayResp 记录 Listen -> RMQ 支付响应
// 使用 fromAddress 作为文件名
func LogListenToRmqPayResp(queueId, chain, symbol, fromAddress, toAddress, txHash string, amount, fee float64, blockHeight uint64, status int) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(fromAddress)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
content := fmt.Sprintf("[PayResp]: %s-%s-%s-%s-%s-%.6f",
t, fromAddress, toAddress, chain, symbol, amount)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// LogListenToRmqRemoveResp 记录 Listen -> RMQ 移除监听响应
// 使用 address 作为文件名
func LogListenToRmqRemoveResp(queueId string, msgType int, chain, symbol, address string, status int) {
if txLogger == nil {
return
}
lf, err := txLogger.getOrCreateLogFile(address)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t := time.Now().Format("2006-01-02 15:04:05")
// 格式:[msg-Type]: time-fromaddress-toaddress-chain-symbol-amount
// RemoveResp 没有 fromAddresstoAddress 是 addressamount 为 0
content := fmt.Sprintf("[RemoveResp]: %s--%s-%s-%s-0",
t, address, chain, symbol)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)
}
}
// Close 关闭所有日志文件
func CloseTransactionLogger() {
if txLogger == nil {

View File

@@ -127,7 +127,7 @@ type RemoveListenMsg_req struct {
Address string `json:"address"`
Timestamp uint64 `json:"timestamp"`
Sign string `json:"sign"`
Stauts int `json:"status,omitempty"`
Status int `json:"status,omitempty"`
}
// 返回收到的删除监听地址消息

View File

@@ -87,6 +87,8 @@ func (s *ServerCtx) handleTopupMsg() {
}
return
}
// 记录 RMQ -> Listen 充值请求
logger.LogRmqToListenTopupReq(msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp)
s.messageServer.ChFromRmqServer <- msg
}
}
@@ -111,6 +113,8 @@ func (s *ServerCtx) handleWithdrawMsg() {
return
}
// 记录 RMQ -> Listen 提现请求
logger.LogRmqToListenWithdrawReq(msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Fee, msg.Timestamp)
s.messageServer.ChFromRmqServer <- msg
}
@@ -133,6 +137,8 @@ func (s *ServerCtx) handlePayMsg() {
return
}
// 记录 RMQ -> Listen 支付请求
logger.LogRmqToListenPayReq(msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Fee, msg.Timestamp)
s.messageServer.ChFromRmqServer <- msg
}
}
@@ -156,6 +162,8 @@ func (s *ServerCtx) handleRemoveMsg() {
return
}
// 记录 RMQ -> Listen 移除监听请求
logger.LogRmqToListenRemoveReq(msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp)
s.messageServer.ChFromRmqServer <- msg
}
}
@@ -165,6 +173,8 @@ func (s *ServerCtx) handleRespMsg() {
switch v := msg.(type) {
case message.TopupMsg_resp:
log.Printf("📨[充值响应]QueueID=%s, Address=%s, Chain=%s, Symbol=%s, TxHash=%s, Status=%d, Amount=%f", v.QueueId, v.Address, v.Chain, v.Symbol, v.TxHash, v.Status, v.Amount)
// 记录 Listen -> RMQ 充值响应
logger.LogListenToRmqTopupResp(v.QueueId, v.Chain, v.Symbol, v.Address, v.FromAddress, v.TxHash, v.Amount, v.BlockHeight, v.Status)
err := s.rmqServer.PublishTopupResp(v)
if err != nil {
log.Printf("❌ 发送充值响应失败: %v", err)
@@ -172,6 +182,8 @@ func (s *ServerCtx) handleRespMsg() {
}
case message.WithdrawMsg_resp:
log.Printf("📨[提现响应]QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, ToAddress=%s, TxHash=%s, Status=%d, Amount=%f", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.ToAddress, v.TxHash, v.Status, v.Amount)
// 记录 Listen -> RMQ 提现响应
logger.LogListenToRmqWithdrawResp(v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.ToAddress, v.TxHash, v.Amount, v.Fee, v.BlockHeight, v.Status)
err := s.rmqServer.PublishWithdrawResp(v)
if err != nil {
log.Printf("❌ 发送提现响应失败: %v", err)
@@ -179,6 +191,8 @@ func (s *ServerCtx) handleRespMsg() {
}
case message.PayMsg_resp:
log.Printf("📨[提现响应]QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, Status=%d", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.Status)
// 记录 Listen -> RMQ 支付响应
logger.LogListenToRmqPayResp(v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.ToAddress, v.TxHash, v.Amount, v.Fee, v.BlockHeight, v.Status)
err := s.rmqServer.PublishPayResp(v)
if err != nil {
log.Printf("❌ 发送支付响应失败: %v", err)
@@ -186,6 +200,8 @@ func (s *ServerCtx) handleRespMsg() {
}
case message.RemoveListenMsg_resp:
log.Printf("📨[充值响应]QueueID=%s, Address=%s, Chain=%s, Symbol=%s,Status=%d", v.QueueId, v.Address, v.Chain, v.Symbol, v.Status)
// 记录 Listen -> RMQ 移除监听响应
logger.LogListenToRmqRemoveResp(v.QueueId, v.MsgType, v.Chain, v.Symbol, v.Address, v.Status)
err := s.rmqServer.PublishRemoveResp(v)
if err != nil {
log.Printf("❌ 发送移除监听响应失败: %v", err)

View File

@@ -78,9 +78,24 @@ func Float64ToBigInt(symbol string, amount float64) *big.Int {
log.Printf("don`t support symbol: %s", symbol)
return nil
}
bigAmount := new(big.Int)
bigAmount.SetInt64(int64(amount * scale))
return bigAmount
// 检查 amount 是否为负数
if amount < 0 {
log.Printf("Warning: negative amount for symbol %s: %f", symbol, amount)
amount = 0 // 可以选择将负数设置为 0 或返回错误
}
// 将 amount 转换为 big.Float 来避免溢出
bigAmount := new(big.Float)
bigAmount.SetFloat64(amount)
// 乘以 scale小数位数避免精度丢失
bigAmount = bigAmount.Mul(bigAmount, big.NewFloat(scale))
// 将 big.Float 转换为 big.Int取整
intAmount, _ := bigAmount.Int(nil)
return intAmount
}
func BigIntToFloat64(symbol string, amount *big.Int) float64 {

View File

@@ -3,17 +3,19 @@ CREATE TABLE IF NOT EXISTS ETH_wallets (
queue_id VARCHAR(255) NOT NULL,
timestamp BIGINT NOT NULL,
sign VARCHAR(255) NOT NULL,
status TINYINT DEFAULT 0 -- 0未在监听 1正在监听
status TINYINT DEFAULT 0
);
CREATE INDEX idx_queue_id ON ETH_wallets (queue_id);
CREATE TABLE IF NOT EXISTS ETH_balances (
address VARCHAR(255) NOT NULL,
symbol VARCHAR(32) DEFAULT "ETH",
used_gas DECIMAL(40,16) DEFAULT 0,
balance DECIMAL(40,16) DEFAULT 0,
success_tx_hash TEXT DEFAULT NULL, -- 使用,隔开
failed_tx_hash TEXT DEFAULT NULL, -- 使用,隔开
PRIMARY KEY (address), -- 钱包地址唯一
success_tx_hash TEXT DEFAULT NULL,
failed_tx_hash TEXT DEFAULT NULL,
PRIMARY KEY (address),
FOREIGN KEY (address) REFERENCES ETH_wallets (address) ON DELETE CASCADE
);
@@ -22,15 +24,15 @@ CREATE TABLE IF NOT EXISTS USDT_balances (
symbol VARCHAR(32) DEFAULT "USDT",
freeze_num DECIMAL(40,16) DEFAULT 0,
balance DECIMAL(40,16) DEFAULT 0,
success_tx_hash TEXT DEFAULT NULL, -- 使用,隔开
failed_tx_hash TEXT DEFAULT NULL, -- 使用,隔开
PRIMARY KEY (address), -- 钱包地址唯一
success_tx_hash TEXT DEFAULT NULL,
failed_tx_hash TEXT DEFAULT NULL,
PRIMARY KEY (address),
FOREIGN KEY (address) REFERENCES ETH_wallets (address) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS ETH_unconfirmed_tx (
queue_id VARCHAR(255) NOT NULL, -- 关联的msg queue_id
tx_type TINYINT NOT NULL, -- 0充值1提现2支付
queue_id VARCHAR(255) NOT NULL,
tx_type TINYINT NOT NULL,
chain VARCHAR(32) DEFAULT "ETH",
symbol VARCHAR(32),
from_addr VARCHAR(255),
@@ -38,6 +40,6 @@ CREATE TABLE IF NOT EXISTS ETH_unconfirmed_tx (
tx_hash VARCHAR(255),
height BIGINT,
amount DECIMAL(40,16),
status TINYINT DEFAULT 2, -- 0充值失败1充值成功2充值待确认
status TINYINT DEFAULT 2,
FOREIGN KEY (queue_id) REFERENCES ETH_wallets (queue_id) ON DELETE CASCADE
);
);

View File

@@ -1,4 +1,3 @@
-- mysql
CREATE TABLE IF NOT EXISTS topup_req_msg (
queue_id VARCHAR(255) NOT NULL,
chain VARCHAR(32) NOT NULL,
@@ -19,7 +18,7 @@ CREATE TABLE IF NOT EXISTS topup_resp_msg (
amount DECIMAL(30,16) NOT NULL,
tx_hash VARCHAR(255) DEFAULT NULL,
height BIGINT DEFAULT NULL,
status TINYINT DEFAULT 5, --constant模块的定义
status TINYINT DEFAULT 5,
FOREIGN KEY (queue_id) REFERENCES topup_req_msg(queue_id)
);
@@ -33,7 +32,7 @@ CREATE TABLE IF NOT EXISTS withdraw_req_msg (
fee DECIMAL(30,16) NOT NULL,
timestamp BIGINT,
sign VARCHAR(255),
status TINYINT DEFAULT 5, --constant模块的定义
status TINYINT DEFAULT 5,
PRIMARY KEY (queue_id)
);
@@ -47,7 +46,7 @@ CREATE TABLE IF NOT EXISTS withdraw_resp_msg (
amount DECIMAL(30,16) DEFAULT NULL,
fee DECIMAL(30,16) DEFAULT NULL,
height BIGINT DEFAULT NULL,
status TINYINT DEFAULT 5, --constant模块的定义
status TINYINT DEFAULT 5,
FOREIGN KEY (queue_id) REFERENCES withdraw_req_msg(queue_id)
);
@@ -61,7 +60,7 @@ CREATE TABLE IF NOT EXISTS pay_req_msg (
fee DECIMAL(30,16) NOT NULL,
timestamp BIGINT,
sign VARCHAR(255),
status TINYINT DEFAULT 5, --constant模块的定义
status TINYINT DEFAULT 5,
PRIMARY KEY (queue_id)
);
@@ -75,22 +74,10 @@ CREATE TABLE IF NOT EXISTS pay_resp_msg (
amount DECIMAL(30,16) DEFAULT NULL,
fee DECIMAL(30,16) DEFAULT NULL,
height BIGINT DEFAULT NULL,
status TINYINT DEFAULT 5, --constant模块的定义
status TINYINT DEFAULT 5,
FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
);
-- pay_msg的交易
-- CREATE TABLE IF NOT EXISTS pay_msg_tx (
-- queue_id VARCHAR(255) NOT NULL,
-- tx_hash VARCHAR(255) DEFAULT NULL,
-- to_addr VARCHAR(255) NOT NULL,
-- amount DECIMAL(30,16) DEFAULT NULL,
-- fee DECIMAL(30,16),
-- height BIGINT DEFAULT 0,
-- status TINYINT DEFAULT 5, --遵循constant模块的定义
-- FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
-- );
CREATE TABLE IF NOT EXISTS remove_req_msg(
queue_id VARCHAR(255) NOT NULL,
msg_type TINYINT NOT NULL,

View File

@@ -13,4 +13,10 @@ node server listen -> 新区块产生时读取当前listen server的消息 ->
-> 消息中的from、to、amount = 区块交易中的from、to、amount(提现/充值) -> 返回消息 -> listen -> 修改数据库状态
node server confirm -> 新区块产生同时会读取当前unconfirmtxs数据 -> 对比每个交易的高度 -> 符合确认条件 -> 修改钱包数据
-> 返回消息 -> listen -> 修改相关数据 -> 返回rmq -> 发出消息
-> 返回消息 -> listen -> 修改相关数据 -> 返回rmq -> 发出消息
确认后的状态修改余额增减、hash录入等