From ac22db02f3070d9cfa083899f9c66a4eefc8f920 Mon Sep 17 00:00:00 2001 From: lzx <393768033@qq.com> Date: Fri, 14 Nov 2025 17:43:25 +0800 Subject: [PATCH] modify some msg-struct and table-struct --- internal/blockchain/eth/eth.go | 7 +- internal/blockchain/eth/eth_prv.go | 289 ++++++++++++++------- internal/db/sqlite.go | 2 +- internal/listen/listen.go | 12 +- internal/listen/listen_prv.go | 360 ++++++++++++++++++-------- internal/logger/transaction_logger.go | 18 +- internal/msg/msg.go | 87 +++++-- internal/queue/rabbitmq.go | 4 +- internal/server.go | 28 +- public/msg.sql | 35 +-- public/msg_mysql.sql | 39 +-- 11 files changed, 605 insertions(+), 276 deletions(-) diff --git a/internal/blockchain/eth/eth.go b/internal/blockchain/eth/eth.go index c8d3d4f..985a2a6 100644 --- a/internal/blockchain/eth/eth.go +++ b/internal/blockchain/eth/eth.go @@ -144,7 +144,11 @@ func NewETHNode(cfg message.Config, decodeKey string, l *listen.ListenServer) (* Ctx: ctx, Cancel: cancel, } - + // 初始化表 + err = ethnode.initTables() + if err != nil { + return nil, err + } // 更新网络公共数据和加载钱包 height, err := ethnode.getHeight() if err != nil { @@ -161,6 +165,7 @@ func NewETHNode(cfg message.Config, decodeKey string, l *listen.ListenServer) (* return nil, fmt.Errorf("load unconfirmtxs error: %v", err) } log.Println("✅ ETH节点已启动") + // ethnode.handleHistoryETHEvent(23795552) return ethnode, nil } diff --git a/internal/blockchain/eth/eth_prv.go b/internal/blockchain/eth/eth_prv.go index 94946aa..a7a897a 100644 --- a/internal/blockchain/eth/eth_prv.go +++ b/internal/blockchain/eth/eth_prv.go @@ -9,6 +9,7 @@ import ( message "m2pool-payment/internal/msg" "m2pool-payment/internal/utils" "math/big" + "os" "strings" "sync" "time" @@ -31,6 +32,21 @@ func init_USDT() *USDT { return usdt } +func (e *ETHNode) initTables() error { + sql_script_path := "../public/eth.sql" + sql_byte, err := os.ReadFile(sql_script_path) + if err != nil { + return fmt.Errorf("open msg-sql file error: %v", err) + } + + err = e.SqliteDB.CreateTable(string(sql_byte)) + if err != nil { + return fmt.Errorf("exec eth-sql error: %v", err) + } + + return nil +} + func (e *ETHNode) updateNetInfo(height uint64) { // 创建 WaitGroup var wg sync.WaitGroup @@ -116,11 +132,11 @@ func (e *ETHNode) getSuggestGasPrice() (*big.Int, error) { } // 设置gas price上限,避免在网络拥堵时费用过高 - // 这里设置为20 Gwei (20 * 10^9 wei) - maxGasPrice := new(big.Int).SetUint64(20000000000) // 20 Gwei + // 这里设置为20 Gwei (2 * 10^9 wei) + maxGasPrice := new(big.Int).SetUint64(2000000000) // 2 Gwei if gasPrice.Cmp(maxGasPrice) > 0 { - log.Printf("⚠️ 建议gas price过高 (%v wei),使用上限 20 Gwei", new(big.Int).Div(gasPrice, big.NewInt(1e18))) + log.Printf("⚠️ 建议gas price过高 (%v wei),使用上限 2 Gwei", new(big.Int).Div(gasPrice, big.NewInt(1e18))) return maxGasPrice, nil } @@ -144,15 +160,15 @@ func (e *ETHNode) getEIP1559GasFees() (*big.Int, *big.Int, error) { } // 设置优先级费用(tip),这里设置为2 Gwei - maxPriorityFeePerGas := new(big.Int).SetUint64(2000000000) // 2 Gwei + maxPriorityFeePerGas := new(big.Int).SetUint64(2000000) // 0.2 Gwei // 计算最大费用 = 基础费用 + 优先级费用 maxFeePerGas := new(big.Int).Add(baseFee, maxPriorityFeePerGas) - // 设置最大费用上限为30 Gwei - maxFeeLimit := new(big.Int).SetUint64(30000000000) // 30 Gwei + // 设置最大费用上限为2 Gwei + maxFeeLimit := new(big.Int).SetUint64(2000000000) // 2 Gwei if maxFeePerGas.Cmp(maxFeeLimit) > 0 { - log.Printf("⚠️ 计算的最大费用过高 (%v wei),使用上限 30 Gwei", new(big.Int).Div(maxFeePerGas, big.NewInt(1e18))) + log.Printf("⚠️ 计算的最大费用过高 (%v wei),使用上限 2 Gwei", new(big.Int).Div(maxFeePerGas, big.NewInt(1e18))) maxFeePerGas = maxFeeLimit } @@ -387,10 +403,11 @@ func (e *ETHNode) loadWallets() error { addresses = append(addresses, addr) wallets[addr] = wallet } - + log.Printf("ETH钱包加载成功:%v", addresses) if err := rows.Err(); err != nil { return fmt.Errorf("error occurred while iterating rows: %v", err) } + if len(addresses) > 0 { pks, err := e.getAddressesPks(addresses) if err != nil { @@ -404,6 +421,10 @@ func (e *ETHNode) loadWallets() error { e.mu.Unlock() } + // for addr, balance := range e.Wallets { + // log.Printf("钱包:%s, wallets: %v", addr, balance) + // } + return nil } @@ -634,8 +655,50 @@ func (e *ETHNode) listenETHTransactions() error { } } +func (e *ETHNode) handleHistoryETHEvent(height uint64) { + block, err := e.RpcClient.BlockByNumber(e.Ctx, big.NewInt(int64(height))) + if err != nil { + log.Println(err) + return + } + + for _, tx := range block.Transactions() { + txHash := tx.Hash().Hex() + // 只处理ETH转账(Value > 0) + if tx.Value().Sign() <= 0 { + continue + } + // 使用 types.Sender 获取发送方地址 + signer := types.LatestSignerForChainID(e.NetID) + from, err := types.Sender(signer, tx) + if err != nil { + log.Println("获取发送方地址失败:", err) + continue + } + + toAddr := "" + if tx.To() != nil { + toAddr = strings.ToLower(tx.To().Hex()) + } + fromAddr := strings.ToLower(from.Hex()) + // 获取交易金额 + amount := utils.BigIntETHToFloat64(tx.Value()) + + if _, ok := e.Wallets[toAddr]; ok { + msg, err := e.MessageServer.FindTopupMsgWithToaddress("ETH", toAddr) + if err != nil { + log.Printf("❌ 未查找到ETH充值消息中有address(%s)信息", toAddr) + continue + } + log.Printf("当前交易txHash:%s, fromAddr: %s, toAddr: %s, amount: %fETH", txHash, fromAddr, toAddr, amount) + log.Println(msg) + } + } +} + func (e *ETHNode) handleETHEvent(header *types.Header) { height := header.Number.Uint64() + log.Printf("当前区块高度:%d", height) // 获取区块中的所有交易 block, err := e.RpcClient.BlockByHash(e.Ctx, header.Hash()) if err != nil { @@ -666,6 +729,7 @@ func (e *ETHNode) handleETHEvent(header *types.Header) { amount := utils.BigIntETHToFloat64(tx.Value()) // toAddr和监听钱包一致,表示(充值) if _, ok := e.Wallets[toAddr]; ok { + log.Printf("监听地址: %s,交易信息:%v", toAddr, tx) msg, err := e.MessageServer.FindTopupMsgWithToaddress("ETH", toAddr) if err != nil { log.Printf("❌ 未查找到ETH充值消息中有address(%s)信息", toAddr) @@ -719,7 +783,7 @@ func (e *ETHNode) handleETHEvent(header *types.Header) { resp := message.WithdrawMsg_resp{ QueueId: v.QueueId, Chain: "ETH", - Symbol: v.Symbol, // 使用消息中的Symbol(应该是ETH) + Symbol: "ETH", // 使用消息中的Symbol(应该是ETH) FromAddress: fromAddr, ToAddress: toAddr, TxHash: txHash, @@ -734,7 +798,7 @@ func (e *ETHNode) handleETHEvent(header *types.Header) { QueueId: v.QueueId, TxType: 1, // 提现类型 Chain: "ETH", - Symbol: v.Symbol, // 使用消息中的Symbol(应该是ETH) + Symbol: "ETH", // 使用消息中的Symbol(应该是ETH) From: fromAddr, To: toAddr, TxHash: txHash, @@ -746,30 +810,36 @@ func (e *ETHNode) handleETHEvent(header *types.Header) { } case message.PayMsg_req: - for to, transaction := range v.Transactions { - if v.FromAddress == fromAddr && to == toAddr && transaction.Amount == amount { - resp := transaction - resp.Chain = "ETH" - resp.Symbol = v.Symbol // 使用消息中的Symbol(可能是ETH或USDT) - resp.Status = constant.STATUS_PENDING - go e.asyncSendMsgToListen(resp, 3, 5*time.Second) - - e.UnConfirmedTxs.mu.Lock() - e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{ - QueueId: v.QueueId, - TxType: 2, // 支付类型 - Chain: "ETH", - Symbol: v.Symbol, // 使用消息中的Symbol - From: fromAddr, - To: toAddr, - TxHash: txHash, - Height: height, - Amount: tx.Value(), - Status: constant.STATUS_PENDING, - } - e.UnConfirmedTxs.mu.Unlock() + if v.FromAddress == fromAddr && v.ToAddress == toAddr && v.Amount == amount { + resp := message.PayMsg_resp{ + QueueId: v.QueueId, + Chain: "ETH", + Symbol: "ETH", // 使用消息中的Symbol(应该是ETH) + FromAddress: fromAddr, + ToAddress: toAddr, + TxHash: txHash, + Amount: amount, + Fee: v.Fee, + BlockHeight: height, + Status: constant.STATUS_PENDING, } + go e.asyncSendMsgToListen(resp, 3, 5*time.Second) + e.UnConfirmedTxs.mu.Lock() + e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{ + QueueId: v.QueueId, + TxType: 2, // 提现类型 + Chain: "ETH", + Symbol: "ETH", // 使用消息中的Symbol(应该是ETH) + From: fromAddr, + To: toAddr, + TxHash: txHash, + Height: height, + Amount: tx.Value(), + Status: constant.STATUS_PENDING, + } + e.UnConfirmedTxs.mu.Unlock() } + default: } @@ -897,7 +967,7 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) { QueueId: v.QueueId, TxType: 1, // 提现类型 Chain: "ETH", - Symbol: v.Symbol, // 使用消息中的Symbol(应该是USDT) + Symbol: "USDT", // 使用消息中的Symbol(应该是USDT) From: fromAddr, To: toAddr, TxHash: txHash, @@ -909,29 +979,34 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) { } case message.PayMsg_req: - for to, transaction := range v.Transactions { - if v.FromAddress == fromAddr && to == toAddr && transaction.Amount == value_float { - resp := transaction - resp.Chain = "ETH" - resp.Symbol = v.Symbol // 使用消息中的Symbol(应该是USDT) - resp.Status = constant.STATUS_PENDING - go e.asyncSendMsgToListen(resp, 3, 5*time.Second) - - e.UnConfirmedTxs.mu.Lock() - e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{ - QueueId: v.QueueId, - TxType: 2, // 支付类型 - Chain: "ETH", - Symbol: v.Symbol, // 使用消息中的Symbol(应该是USDT) - From: fromAddr, - To: toAddr, - TxHash: txHash, - Height: height, - Amount: transferEvent.Value, - Status: constant.STATUS_PENDING, - } - e.UnConfirmedTxs.mu.Unlock() + if v.FromAddress == fromAddr && v.ToAddress == toAddr && v.Amount == value_float { + resp := message.PayMsg_resp{ + QueueId: v.QueueId, + Chain: "ETH", + Symbol: "USDT", + FromAddress: fromAddr, + ToAddress: toAddr, + TxHash: txHash, + Amount: value_float, + Fee: v.Fee, + BlockHeight: height, + Status: constant.STATUS_PENDING, } + go e.asyncSendMsgToListen(resp, 3, 5*time.Second) + e.UnConfirmedTxs.mu.Lock() + e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{ + QueueId: v.QueueId, + TxType: 1, // 提现类型 + Chain: "ETH", + Symbol: "USDT", // 使用消息中的Symbol(应该是USDT) + From: fromAddr, + To: toAddr, + TxHash: txHash, + Height: height, + Amount: transferEvent.Value, + Status: constant.STATUS_PENDING, + } + e.UnConfirmedTxs.mu.Unlock() } default: @@ -986,7 +1061,10 @@ func (e *ETHNode) confirm() { delete(e.UnConfirmedTxs.Transactions, txHash) continue } - + var tableMap = map[string]string{ + "ETH": "ETH_balances", + "USDT": "USDT_balances", + } switch v := msg.(type) { case message.TopupMsg_req: if status == constant.STATUS_SUCCESS { @@ -1013,6 +1091,18 @@ func (e *ETHNode) confirm() { TxHash: txHash, BlockHeight: tx.Height, } + go func() { + str := "UPDATE " + tableMap[tx.Symbol] + " SET success_tx_hash = success_tx_hash || ? WHERE address = ?" + params := []any{txHash + ",", v.Address} + count, err := e.SqliteDB.Update(str, params) + if err != nil { + // 更详细的错误日志,包括 QueueId 和 Status + log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", v.QueueId, err) + } else if count != 1 { + // 如果更新的行数不是 1,日志中记录详细信息 + log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", v.QueueId, count) + } + }() responses = append(responses, response) // 将消息提交至responses case message.WithdrawMsg_req: if status == constant.STATUS_SUCCESS { @@ -1060,17 +1150,19 @@ func (e *ETHNode) confirm() { delete(e.UnConfirmedTxs.Transactions, txHash) continue } - response := message.PayData{ - QueueId: v.QueueId, - Chain: v.Chain, - Symbol: v.Symbol, - TxHash: txHash, - ToAddress: tx.To, - Amount: float_amount, - BlockHeight: tx.Height, + response := message.PayMsg_resp{ + QueueId: tx.QueueId, + Chain: tx.Chain, + Symbol: tx.Symbol, Status: status, + Amount: float_amount, + Fee: v.Fee, + TxHash: txHash, + FromAddress: tx.From, + ToAddress: tx.To, + BlockHeight: tx.Height, } - responses = append(responses, response) + responses = append(responses, response) // 将消息提交至responses default: log.Printf("未知的消息类型: %v, 跳过此交易", v) delete(e.UnConfirmedTxs.Transactions, txHash) @@ -1137,6 +1229,11 @@ func (e *ETHNode) handleListen_Topup_req(msg message.TopupMsg_req) { if err != nil { log.Printf("Received ListenServer Topup_req msg: insert sqlite3 db error: %v", err) } + go e.asyncSendMsgToListen(message.UpdateReqState{ + QueueId: msg.QueueId, + MsgType: 0, + Status: constant.STATUS_SUCCESS, + }, 3, 5*time.Second) }() } @@ -1193,6 +1290,11 @@ func (e *ETHNode) handleListen_Withdraw_req(msg message.WithdrawMsg_req) { return } // 转账成功,等待chain server listen监听到该笔交易 + go e.asyncSendMsgToListen(message.UpdateReqState{ + QueueId: msg.QueueId, + MsgType: 1, + Status: constant.STATUS_SUCCESS, + }, 3, 5*time.Second) log.Printf("withdraw - transfer success: QueueId(%s)", msg.QueueId) } else { // 校验失败 // 提现转账账户余额不足 @@ -1209,8 +1311,8 @@ func (e *ETHNode) handleListen_Pay_req(msg message.PayMsg_req) { e.NetInfo.mu.Unlock() var target_amount_eth, target_amount_usdt *big.Int // 将 msg.Amount 和 msg.Fee 转换为 big.Int,只调用一次 - amountBigInt := utils.Float64ToBigInt(msg.Symbol, msg.TotalAmount) - feeBigInt := utils.Float64ToBigInt(msg.Symbol, msg.TotalFee) + amountBigInt := utils.Float64ToBigInt(msg.Symbol, msg.Amount) + feeBigInt := utils.Float64ToBigInt(msg.Symbol, msg.Fee) switch msg.Symbol { case "ETH": // 计算目标金额 @@ -1223,41 +1325,46 @@ func (e *ETHNode) handleListen_Pay_req(msg message.PayMsg_req) { default: return } + // 构建相应通用数据,Status根据后续情况变化 result_msg := message.PayMsg_resp{ - QueueId: msg.QueueId, - Chain: msg.Chain, - Symbol: msg.Symbol, - FromAddress: msg.FromAddress, - Transactions: msg.Transactions, + QueueId: msg.QueueId, + Chain: msg.Chain, + Symbol: msg.Symbol, + Amount: msg.Amount, + Fee: msg.Fee, + FromAddress: msg.FromAddress, + ToAddress: msg.ToAddress, } check_result, err := e.checkBalance(msg.Symbol, msg.FromAddress, target_amount_eth, target_amount_usdt) + // 余额校验错误,绕过转账,返回错误响应 if err != nil { log.Printf("check balance error: %v", err) - result_msg.PayStatus = constant.STATUS_ERROR - for to, tx := range result_msg.Transactions { - tx.Status = constant.STATUS_ERROR - result_msg.Transactions[to] = tx - } + result_msg.Status = constant.STATUS_ERROR go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second) return } - + // 校验成功 if check_result { - for _, tx := range result_msg.Transactions { - err := e.Transfer(msg.FromAddress, tx.ToAddress, msg.Symbol, tx.Amount, tx.Fee) - if err != nil { - log.Println(err) - tx.Status = constant.STATUS_ERROR - } else { - tx.Status = constant.STATUS_PENDING - } + // 开始转账 + err := e.Transfer(msg.FromAddress, msg.ToAddress, msg.Symbol, msg.Amount, msg.Fee) + // 转账失败,返回转账失败结果 + if err != nil { + log.Printf("withdraw - transfer error: %v", err) + result_msg.Status = constant.STATUS_ERROR + // 提现转账错误 + go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second) + return } - // 此时所有待转账的数据均已进行转账处理 // 转账成功,等待chain server listen监听到该笔交易 - log.Printf("pay - transfer success: QueueId(%s)", msg.QueueId) - } else { + go e.asyncSendMsgToListen(message.UpdateReqState{ + QueueId: msg.QueueId, + MsgType: 1, + Status: constant.STATUS_SUCCESS, + }, 3, 5*time.Second) + log.Printf("withdraw - transfer success: QueueId(%s)", msg.QueueId) + } else { // 校验失败 // 提现转账账户余额不足 - result_msg.PayStatus = constant.STATUS_BALANCE_NOT_ENOUGH + result_msg.Status = constant.STATUS_BALANCE_NOT_ENOUGH go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second) return } @@ -1276,7 +1383,7 @@ func (e *ETHNode) handleListen_Remove_req(msg message.RemoveListenMsg_req) { } str := "UPDATE ETH_wallets SET status = ? WHERE address = ?" params := []any{0, msg.Address} - count, err := e.SqliteDB.Update(str, params...) + count, err := e.SqliteDB.Update(str, params) if err != nil || count != 1 { log.Printf("Remove address(%s) error: count(%d)", msg.Address, count) // result_msg.Status = constant.STATUS_FAILED diff --git a/internal/db/sqlite.go b/internal/db/sqlite.go index 355a664..1d37959 100644 --- a/internal/db/sqlite.go +++ b/internal/db/sqlite.go @@ -74,7 +74,7 @@ func (s *SQLite) Delete(sqlStr string, args ...any) (int64, error) { } // 更新数据 -func (s *SQLite) Update(sqlStr string, args ...any) (int64, error) { +func (s *SQLite) Update(sqlStr string, args []any) (int64, error) { res, err := s.DB.Exec(sqlStr, args...) if err != nil { return 0, fmt.Errorf("update error: %v", err) diff --git a/internal/listen/listen.go b/internal/listen/listen.go index 83d0663..4cfba2b 100644 --- a/internal/listen/listen.go +++ b/internal/listen/listen.go @@ -85,9 +85,7 @@ func NewListenServer(cfg message.Config) *ListenServer { ch[net] = make(chan any, 1000) chin[net] = make(chan any, 1000) } - - log.Println("✅ 消息监听处理已启动") - return &ListenServer{ + var l = &ListenServer{ Config: cfg, // MysqlDB: dbConn, SqliteDB: sqlite, @@ -100,6 +98,9 @@ func NewListenServer(cfg message.Config) *ListenServer { ChFromRmqServer: rmq_ch_in, ChToRmqServer: rmq_ch_out, } + l.loadMsg() + log.Println("✅ 消息监听处理已启动") + return l } // rmq -> listen -> chain server @@ -141,12 +142,15 @@ func (l *ListenServer) NetMsgIn() { for _, ch := range l.ChFromChainServer { for msg := range ch { switch v := msg.(type) { + // 接收到区块链节点返回的更新req状态消息 + case message.UpdateReqState: + go l.handleUpdateReqState(v) // 接收到区块链节点服务返回的resp消息,除了修改相关状态和数据库,还需通过rmq_out通道返回出去 case message.TopupMsg_resp: go l.handleChainTopup_resp(v) case message.WithdrawMsg_resp: go l.handleChainWithdraw_resp(v) - case message.PayData: + case message.PayMsg_resp: go l.handleChainPay_resp(v) case message.RemoveListenMsg_resp: go l.handleChainRemove_resp(v) diff --git a/internal/listen/listen_prv.go b/internal/listen/listen_prv.go index 53cebae..d90b6c1 100644 --- a/internal/listen/listen_prv.go +++ b/internal/listen/listen_prv.go @@ -5,10 +5,162 @@ import ( "log" "m2pool-payment/internal/constant" message "m2pool-payment/internal/msg" - "strings" "time" ) +// 加载所有消息 +func (l *ListenServer) loadMsg() { + topup_sql := "SELECT queue_id, chain, symbol, address, timestamp, sign, status FROM topup_req_msg WHERE status = ?" // 1正在监听 + withdraw_sql := "SELECT queue_id, chain, symbol, from_addr, to_addr, amount, fee, timestamp, sign, status FROM withdraw_req_msg WHERE status = ? OR status = ?" // 2待确认,5待支付 + pay_sql := "SELECT queue_id, chain, symbol, from_addr, to_addr, amount, fee, timestamp, sign, status FROM withdraw_req_msg WHERE status = ? OR status = ?" // 2待确认,5待支付 + remove_sql := "SELECT queue_id, msg_type, chain, symbol, address, timestamp, sign, status FROM remove_req_msg WHERE status = ?" // 2待执行 + topup_params := []any{1} + withdraw_params := []any{2, 5} + pay_params := []any{2, 5} + remove_params := []any{2} + go func() { + rows, err := l.SqliteDB.Query_(topup_sql, topup_params...) + if err != nil { + log.Fatalf("load topup-msg error: %v", err) + return + } + // 遍历查询结果 + for _, row := range rows { + queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 + chain := row["chain"].(string) // 假设 chain 是 string 类型 + symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 + address := row["address"].(string) // 假设 address 是 string 类型 + timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型 + sign := row["sign"].(string) // 假设 sign 是 string 类型 + status := row["status"].(int64) // 假设 status 是 INTEGER 类型 + + // 处理数据,例如打印输出 + l.TopupMsgs[chain].mu.RLock() + l.TopupMsgs[chain].Msgs[queueID] = message.TopupMsg_req{ + QueueId: queueID, + Chain: chain, + Symbol: symbol, + Address: address, + Timestamp: uint64(timestamp), + Sign: sign, + Status: int(status), + } + l.TopupMsgs[chain].mu.RUnlock() + } + log.Printf("充值历史消息load完毕") + }() + + go func() { + rows, err := l.SqliteDB.Query_(withdraw_sql, withdraw_params...) + if err != nil { + log.Fatalf("load withdraw-msg error: %v", err) + return + } + // 遍历查询结果 + for _, row := range rows { + queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 + chain := row["chain"].(string) // 假设 chain 是 string 类型 + symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 + fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型 + toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型 + amount := row["amount"].(float64) // 假设 amount 是 float64 类型 + fee := row["fee"].(float64) // 假设 fee 是 float64 类型 + timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型 + sign := row["sign"].(string) // 假设 sign 是 string 类型 + status := row["status"].(int64) // 假设 status 是 string 类型 + + // 处理数据,例如打印输出 + l.WithdrawMsgs[chain].mu.RLock() + l.WithdrawMsgs[chain].Msgs[queueID] = message.WithdrawMsg_req{ + QueueId: queueID, + Chain: chain, + Symbol: symbol, + FromAddress: fromAddr, + ToAddress: toAddr, + Amount: amount, + Fee: fee, + Timestamp: uint64(timestamp), + Sign: sign, + Status: int(status), + } + l.WithdrawMsgs[chain].mu.RUnlock() + } + log.Printf("提现历史消息load完毕") + }() + + go func() { + rows, err := l.SqliteDB.Query_(pay_sql, pay_params...) + if err != nil { + log.Fatalf("load pay-msg error: %v", err) + return + } + // 遍历查询结果 + for _, row := range rows { + queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 + chain := row["chain"].(string) // 假设 chain 是 string 类型 + symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 + fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型 + toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型 + amount := row["amount"].(float64) // 假设 amount 是 float64 类型 + fee := row["fee"].(float64) // 假设 fee 是 float64 类型 + timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型 + sign := row["sign"].(string) // 假设 sign 是 string 类型 + status := row["status"].(int64) // 假设 status 是 string 类型 + + // 处理数据,例如打印输出 + l.PayMsgs[chain].mu.RLock() + l.PayMsgs[chain].Msgs[queueID] = message.PayMsg_req{ + QueueId: queueID, + Chain: chain, + Symbol: symbol, + FromAddress: fromAddr, + ToAddress: toAddr, + Amount: amount, + Fee: fee, + Timestamp: uint64(timestamp), + Sign: sign, + Status: int(status), + } + l.PayMsgs[chain].mu.RUnlock() + } + log.Printf("支付历史消息load完毕") + }() + + go func() { + rows, err := l.SqliteDB.Query_(remove_sql, remove_params...) + if err != nil { + log.Fatalf("load remove-msg error: %v", err) + return + } + + for _, row := range rows { + queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型 + msgType := row["msg_type"].(int) + chain := row["chain"].(string) // 假设 chain 是 string 类型 + symbol := row["symbol"].(string) // 假设 symbol 是 string 类型 + address := row["address"].(string) // 假设 address 是 string 类型 + timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型 + sign := row["sign"].(string) // 假设 sign 是 string 类型 + status := row["status"].(int64) // 假设 status 是 INTEGER 类型 + + // 处理数据 + l.RemoveMsgs[chain].mu.RLock() + l.RemoveMsgs[chain].Msgs[queueID] = message.RemoveListenMsg_req{ + QueueId: queueID, + MsgType: msgType, + Chain: chain, + Symbol: symbol, + Address: address, + Timestamp: uint64(timestamp), + Sign: sign, + Stauts: int(status), + } + l.RemoveMsgs[chain].mu.RUnlock() + } + log.Printf("移除监听历史消息load完毕") + }() +} + // 充值消息 func (l *ListenServer) handleRmqTopup_req(msg message.TopupMsg_req) { // 添加到TopupMsgs @@ -16,14 +168,20 @@ func (l *ListenServer) handleRmqTopup_req(msg message.TopupMsg_req) { l.TopupMsgs[msg.Chain].Msgs[msg.QueueId] = msg l.TopupMsgs[msg.Chain].mu.RUnlock() // 写数据库 - go func() { - sql := "INSERT INTO topup_req_msg (queue_id, chain, symbol, address, timestamp, sign, status) VALUES (?,?,?,?,?,?,?)" - params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign, 1} - err := l.SqliteDB.Insert(sql, params) - if err != nil { - log.Printf("Insert Topup_req msg error: %v", err) - } - }() + sql := "INSERT INTO topup_req_msg (queue_id, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign} + err := l.SqliteDB.Insert(sql, params) + if err != nil { + log.Printf("Insert Topup_req msg error: %v", err) + go l.asyncSendMsgToRmq(message.TopupMsg_resp{ + QueueId: msg.QueueId, + Chain: msg.Chain, + Symbol: msg.Symbol, + Address: msg.Address, + Status: constant.STATUS_ERROR, + }, 3, 5*time.Second) + return + } // 传给对应的node server go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second) log.Printf("Insert Topup_req msg success: QueueId(%s)", msg.QueueId) @@ -36,14 +194,21 @@ func (l *ListenServer) handleRmqWithdraw_req(msg message.WithdrawMsg_req) { l.WithdrawMsgs[msg.Chain].Msgs[msg.QueueId] = msg l.WithdrawMsgs[msg.Chain].mu.RUnlock() // 写数据库 - go func() { - sql := "INSERT INTO withdraw_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)" - params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign} - err := l.SqliteDB.Insert(sql, params) - if err != nil { - log.Printf("Insert Withdraw_req msg error: %v", err) - } - }() + sql := "INSERT INTO withdraw_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign} + err := l.SqliteDB.Insert(sql, params) + if err != nil { + log.Printf("Insert Withdraw_req msg error: %v", err) + go l.asyncSendMsgToRmq(message.WithdrawMsg_resp{ + QueueId: msg.QueueId, + Chain: msg.Chain, + Symbol: msg.Symbol, + FromAddress: msg.FromAddress, + ToAddress: msg.ToAddress, + Status: constant.STATUS_ERROR, + }, 3, 5*time.Second) + return + } // 传给对应的node server go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second) log.Printf("Insert Withdraw_req msg success: QueueId(%s)", msg.QueueId) @@ -51,28 +216,29 @@ func (l *ListenServer) handleRmqWithdraw_req(msg message.WithdrawMsg_req) { // 支付消息 func (l *ListenServer) handleRmqPay_req(msg message.PayMsg_req) { + // 添加到WithdrawMsgs l.PayMsgs[msg.Chain].mu.RLock() l.PayMsgs[msg.Chain].Msgs[msg.QueueId] = msg l.PayMsgs[msg.Chain].mu.RUnlock() - - go func() { - sql1 := "INSERT INTO pay_req_msg (queue_id, chain, symbol, from_addr, total_amount, total_fee, timestamp, sign) VALUES (?,?,?,?,?,?,?,?)" - sql2 := "INSERT INTO transactions (queue_id, chain, symbol, from_addr, to_addr, amount, fee) VALUES " - params1 := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, fmt.Sprintf("%f", msg.TotalAmount), fmt.Sprintf("%f", msg.TotalFee), msg.Timestamp, msg.Sign} - params2 := []any{} - for to_addr, tx := range msg.Transactions { - sql2 += "(?,?,?,?,?,?,?), " - params2 = append(params2, msg.QueueId, msg.Chain, msg.Symbol, strings.ToLower(msg.FromAddress), strings.ToLower(to_addr), fmt.Sprintf("%f", tx.Amount), fmt.Sprintf("%f", tx.Fee)) - } - sql2 = sql2[:len(sql2)-2] - err := l.SqliteDB.ExecuteTransactions([]string{sql1, sql2}, [][]any{params1, params2}) - if err != nil { - log.Printf("Insert Pay_req msg error: %v", err) - } - }() - + // 写数据库 + sql := "INSERT INTO pay_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign} + err := l.SqliteDB.Insert(sql, params) + if err != nil { + log.Printf("Insert PayMsg_req msg error: %v", err) + go l.asyncSendMsgToRmq(message.PayMsg_resp{ + QueueId: msg.QueueId, + Chain: msg.Chain, + Symbol: msg.Symbol, + FromAddress: msg.FromAddress, + ToAddress: msg.ToAddress, + Status: constant.STATUS_ERROR, + }, 3, 5*time.Second) + return + } + // 传给对应的node server go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second) - log.Printf("Insert Pay_req msg success: QueueId(%s)", msg.QueueId) + log.Printf("Insert PayMsg_req msg success: QueueId(%s)", msg.QueueId) } // 移除监听消息 @@ -81,19 +247,46 @@ func (l *ListenServer) handleRmqRemove_req(msg message.RemoveListenMsg_req) { l.RemoveMsgs[msg.Chain].Msgs[msg.QueueId] = msg l.RemoveMsgs[msg.Chain].mu.RUnlock() - go func() { - sql := "INSERT INTO remove_req_msg (queue_id, msg_type, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?,?)" - params := []any{msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign} - err := l.SqliteDB.Insert(sql, params) - if err != nil { - log.Printf("Insert Remove_req msg error: %v", err) - } - }() + sql := "INSERT INTO remove_req_msg (queue_id, msg_type, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign} + err := l.SqliteDB.Insert(sql, params) + if err != nil { + log.Printf("Insert Remove_req msg error: %v", err) + go l.asyncSendMsgToRmq(message.RemoveListenMsg_resp{ + QueueId: msg.QueueId, + MsgType: msg.MsgType, + Chain: msg.Chain, + Symbol: msg.Symbol, + Address: msg.Address, + Status: constant.STATUS_ERROR, + }, 3, 5*time.Second) + } go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second) log.Printf("Insert Remove_req msg success: QueueId(%s)", msg.QueueId) } +// 更新状态响应 +func (l *ListenServer) handleUpdateReqState(msg message.UpdateReqState) { + typeMap := map[int]string{ + 0: "topup_req_msg", + 1: "withdraw_req_msg", + 2: "pay_req_msg", + } + str := "UPDATE " + typeMap[msg.MsgType] + " SET status = ? WHERE queue_id = ?" + params := []any{msg.Status, msg.QueueId} + go func() { + count, err := l.SqliteDB.Update(str, params) + if err != nil { + // 更详细的错误日志,包括 QueueId 和 Status + log.Printf("Failed to update update_req_msg for queue_id %s: %v", msg.QueueId, err) + } else if count != 1 { + // 如果更新的行数不是 1,日志中记录详细信息 + log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + } + }() +} + // 充值响应 func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) { switch msg.Status { @@ -104,7 +297,7 @@ func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) { count, err := l.SqliteDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status - log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) + log.Printf("Failed to update topup_resp_msg for queue_id %s: %v", msg.QueueId, err) } else if count != 1 { // 如果更新的行数不是 1,日志中记录详细信息 log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) @@ -152,7 +345,7 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) { count, err := l.SqliteDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status - log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) + log.Printf("Failed to update withdraw_resp_msg for queue_id %s: %v", msg.QueueId, err) } else if count != 1 { // 如果更新的行数不是 1,日志中记录详细信息 log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) @@ -177,85 +370,50 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) { } // 支付响应 -func (l *ListenServer) handleChainPay_resp(msg message.PayData) { +func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) { switch msg.Status { + // pending状态表示转账已完成且在区块链中被找到,等待后续确认+记录到数据库 case constant.STATUS_PENDING: - l.PayMsgs[msg.Chain].mu.Lock() - l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions[msg.ToAddress] = msg - l.PayMsgs[msg.Chain].mu.Unlock() - go func() { - str := "INSERT INTO pay_msg_tx (queue_id, tx_hash, to_addr, amount, fee, height, status) VALUES (?,?,?,?,?,?,?)" - params := []any{msg.QueueId, msg.TxHash, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.BlockHeight, msg.Status} + str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, tx_hash, amount, fee, height, status) VALUES (?,?,?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.TxHash, msg.Amount, msg.Fee, msg.BlockHeight, msg.Status} err := l.SqliteDB.Insert(str, params) if err != nil { - log.Printf("Insert pay_msg_tx msg error: %v", err) + log.Println(err) } - }() case constant.STATUS_SUCCESS, constant.STATUS_FAILED: - l.PayMsgs[msg.Chain].mu.Lock() - l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions[msg.ToAddress] = msg - l.PayMsgs[msg.Chain].mu.Unlock() + l.WithdrawMsgs[msg.Chain].mu.Lock() + delete(l.WithdrawMsgs[msg.Chain].Msgs, msg.QueueId) + l.WithdrawMsgs[msg.Chain].mu.Unlock() go func() { - str := "UPDATE pay_msg_tx SET status = ? WHERE tx_hash = ?" + str := "UPDATE pay_resp_msg SET status = ? WHERE tx_hash = ?" params := []any{msg.Status, msg.TxHash} - count, err := l.SqliteDB.Update(str, params...) + count, err := l.SqliteDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status - log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) + log.Printf("Failed to update pay_resp_msg for queue_id %s: %v", msg.QueueId, err) } else if count != 1 { // 如果更新的行数不是 1,日志中记录详细信息 log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) } }() - - go func() { - need_send := true - l.PayMsgs[msg.Chain].mu.Lock() - for _, tx := range l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions { - if tx.Status != constant.STATUS_SUCCESS && tx.Status != constant.STATUS_FAILED { - need_send = false - break - } - } - l.PayMsgs[msg.Chain].mu.Unlock() - if need_send { - str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, pay_status) VALUES (?,?,?,?,?)" - params := []any{l.PayMsgs[msg.Chain].Msgs[msg.QueueId].QueueId, l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Chain, l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Symbol, l.PayMsgs[msg.Chain].Msgs[msg.QueueId].FromAddress, constant.STATUS_SUCCESS} - err := l.SqliteDB.Insert(str, params) - if err != nil { - log.Printf("Insert pay_resp_msg msg error: %v", err) - } - resp := message.PayMsg_resp{ - QueueId: msg.QueueId, - Chain: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Chain, - Symbol: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Symbol, - FromAddress: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].FromAddress, - PayStatus: constant.STATUS_SUCCESS, - Transactions: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions, - } - go l.asyncSendMsgToRmq(resp, 3, 5*time.Second) - } - }() + go l.asyncSendMsgToRmq(msg, 3, 5*time.Second) default: - l.PayMsgs[msg.Chain].mu.Lock() - l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions[msg.ToAddress] = msg - l.PayMsgs[msg.Chain].mu.Unlock() + l.WithdrawMsgs[msg.Chain].mu.Lock() + delete(l.WithdrawMsgs[msg.Chain].Msgs, msg.QueueId) + l.WithdrawMsgs[msg.Chain].mu.Unlock() go func() { - str := "UPDATE pay_msg_tx SET status = ? WHERE tx_hash = ?" - params := []any{msg.Status, msg.TxHash} - count, err := l.SqliteDB.Update(str, params...) + str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, fee, status) VALUES (?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Status} + err := l.SqliteDB.Insert(str, params) if err != nil { - // 更详细的错误日志,包括 QueueId 和 Status - log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) - } else if count != 1 { - // 如果更新的行数不是 1,日志中记录详细信息 - log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + log.Println(err) } }() + go l.asyncSendMsgToRmq(msg, 3, 5*time.Second) } } @@ -273,7 +431,7 @@ func (l *ListenServer) handleChainRemove_resp(msg message.RemoveListenMsg_resp) // 更新数据库 str := "UPDATE remove_resp_msg SET status = ? WHERE queue_id = ?" params := []any{msg.Status, msg.QueueId} - count, err := l.SqliteDB.Update(str, params...) + count, err := l.SqliteDB.Update(str, params) if err != nil { // 更详细的错误日志,包括 QueueId 和 Status log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) diff --git a/internal/logger/transaction_logger.go b/internal/logger/transaction_logger.go index 54db413..c027f68 100644 --- a/internal/logger/transaction_logger.go +++ b/internal/logger/transaction_logger.go @@ -2,10 +2,8 @@ package logger import ( "compress/gzip" - "encoding/json" "fmt" "io" - message "m2pool-payment/internal/msg" "os" "path/filepath" "sync" @@ -233,7 +231,7 @@ func LogWithdraw(fromAddress string, status string, amount float64, toAddress st } // LogPay 记录支付消息 -func LogPay(status string, fromAddress string, queueId string, transactions map[string]*message.PayData) { +func LogPay(status string, fromAddress string, queueId string) { if txLogger == nil { return } @@ -244,14 +242,14 @@ func LogPay(status string, fromAddress string, queueId string, transactions map[ fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) return } - t, err := json.Marshal(transactions) - if err != nil { - fmt.Println("Error marshalling to JSON:", err) - return - } + // t, err := json.Marshal(transactions) + // if err != nil { + // fmt.Println("Error marshalling to JSON:", err) + // return + // } timestamp := time.Now().Format("2006-01-02 15:04:05") - content := fmt.Sprintf("%s [pay]-[%s] | FromAddress: %s | QueueId: %s | Transactions: %v", - timestamp, status, fromAddress, queueId, string(t)) + content := fmt.Sprintf("%s [pay]-[%s] | FromAddress: %s | QueueId: %s", + timestamp, status, fromAddress, queueId) if err := lf.write(content); err != nil { fmt.Printf("⚠️ 写入日志失败: %v\n", err) diff --git a/internal/msg/msg.go b/internal/msg/msg.go index 3b4ddfa..9fe8728 100644 --- a/internal/msg/msg.go +++ b/internal/msg/msg.go @@ -57,40 +57,65 @@ type WithdrawMsg_resp struct { } // =============================== type2 =============================== - type PayMsg_req struct { - QueueId string `json:"queue_id"` - Chain string `json:"chain"` - Symbol string `json:"symbol"` - FromAddress string `json:"from_address"` - TotalAmount float64 `json:"total_amount"` - TotalFee float64 `json:"total_fee"` - Timestamp uint64 `json:"timestamp"` - Sign string `json:"sign"` - Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_req{}, ...} - Status int `json:"status,omitempty"` + QueueId string `json:"queue_id"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + FromAddress string `json:"from_address"` // 我们提供的地址 + ToAddress string `json:"to_address"` // 用户要提现到的地址 + Amount float64 `json:"amount"` + Fee float64 `json:"fee"` + Timestamp uint64 `json:"timestamp"` + Sign string `json:"sign"` + Status int `json:"status,omitempty"` } type PayMsg_resp struct { - QueueId string `json:"queue_id"` - Chain string `json:"chain"` - Symbol string `json:"symbol"` - FromAddress string `json:"from_address"` - PayStatus int `json:"pay_status"` // 1至少有一笔转账成功,3sign校验失败,4钱包余额不足 - Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_resp{}, ...} + QueueId string `json:"queue_id"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + FromAddress string `json:"from_address"` // 来源地址 + ToAddress string `json:"to_address"` // 目标地址 + TxHash string `json:"tx_hash,omitempty"` + Amount float64 `json:"amount,omitempty"` + Fee float64 `json:"fee,omitempty"` + BlockHeight uint64 `json:"block_height,omitempty"` // 区块高度 + Status int `json:"status"` // 0失败,1成功,3sign校验失败 } -type PayData struct { - QueueId string `json:"queue_id,omitempty"` - Chain string `json:"chain,omitempty"` - Symbol string `json:"symbol,omitempty"` - TxHash string `json:"tx_hash,omitempty"` - ToAddress string `json:"to_address"` - Amount float64 `json:"amount"` - Fee float64 `json:"fee"` - BlockHeight uint64 `json:"block_height,omitempty"` - Status int `json:"status,omitempty"` // 0失败,1成功, 2待确认 -} +// type PayMsg_req struct { +// QueueId string `json:"queue_id"` +// Chain string `json:"chain"` +// Symbol string `json:"symbol"` +// FromAddress string `json:"from_address"` +// TotalAmount float64 `json:"total_amount"` +// TotalFee float64 `json:"total_fee"` +// Timestamp uint64 `json:"timestamp"` +// Sign string `json:"sign"` +// Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_req{}, ...} +// Status int `json:"status,omitempty"` +// } + +// type PayMsg_resp struct { +// QueueId string `json:"queue_id"` +// Chain string `json:"chain"` +// Symbol string `json:"symbol"` +// FromAddress string `json:"from_address"` +// PayStatus int `json:"pay_status"` // 1至少有一笔转账成功,3sign校验失败,4钱包余额不足 +// Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_resp{}, ...} +// } + +// type PayData struct { +// QueueId string `json:"queue_id,omitempty"` +// Chain string `json:"chain,omitempty"` +// Symbol string `json:"symbol,omitempty"` +// TxHash string `json:"tx_hash,omitempty"` +// ToAddress string `json:"to_address"` +// Amount float64 `json:"amount"` +// Fee float64 `json:"fee"` +// BlockHeight uint64 `json:"block_height,omitempty"` +// Status int `json:"status,omitempty"` // 0失败,1成功, 2待确认 +// } // =============================== type3 =============================== // 接收到的删除监听地址消息 @@ -155,3 +180,9 @@ type TransferResult struct { Amount float64 Status int } + +type UpdateReqState struct { + QueueId string + MsgType int + Status int +} diff --git a/internal/queue/rabbitmq.go b/internal/queue/rabbitmq.go index 62d7c9d..247a193 100644 --- a/internal/queue/rabbitmq.go +++ b/internal/queue/rabbitmq.go @@ -190,8 +190,8 @@ func (r *RabbitMQServer) consumePay() { if err := json.Unmarshal(body, &msg); err != nil { return fmt.Errorf("failed to parse pay message: %w", err) } - log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, From=%s, Chain=%s, Symbol=%s, TxCount=%d", - msg.QueueId, msg.FromAddress, msg.Chain, msg.Symbol, len(msg.Transactions)) + log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, From=%s, To=%s, Chain=%s, Symbol=%s, Amount=%f", + msg.QueueId, msg.FromAddress, msg.ToAddress, msg.Chain, msg.Symbol, msg.Amount) if r.OnPayMsg != nil { r.OnPayMsg(msg) diff --git a/internal/server.go b/internal/server.go index b5a115b..e265761 100644 --- a/internal/server.go +++ b/internal/server.go @@ -123,10 +123,9 @@ func (s *ServerCtx) handlePayMsg() { // 验证签名 if !s.verifyMessage(msg.Timestamp, msg.Sign) { err := s.rmqServer.PublishPayResp(message.PayMsg_resp{ - QueueId: msg.QueueId, - FromAddress: msg.FromAddress, - PayStatus: constant.STATUS_VERIFY_FAILED, - Transactions: msg.Transactions, + QueueId: msg.QueueId, + FromAddress: msg.FromAddress, + Status: constant.STATUS_VERIFY_FAILED, }) if err != nil { log.Printf("❌ 发布支付失败响应失败: %v", err) @@ -179,7 +178,7 @@ func (s *ServerCtx) handleRespMsg() { return } case message.PayMsg_resp: - log.Printf("📨[提现响应]:QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, Status=%d", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.PayStatus) + log.Printf("📨[提现响应]:QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, Status=%d", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.Status) err := s.rmqServer.PublishPayResp(v) if err != nil { log.Printf("❌ 发送支付响应失败: %v", err) @@ -199,13 +198,30 @@ func (s *ServerCtx) handleRespMsg() { } } +func (s *ServerCtx) initDB() { + // msgDB_path := s.Config.MsgConfig.SqlitePath + // ethDB_path := s.Config.ETHConfig.SqlitePath + msg_sql_script_path := "../public/msg.sql" + msg_sql_byte, err := os.ReadFile(msg_sql_script_path) + if err != nil { + log.Fatalf("open msg-sql file error: %v", err) + return + } + + err = s.messageServer.SqliteDB.CreateTable(string(msg_sql_byte)) + if err != nil { + log.Fatalf("exec msg-sql error: %v", err) + return + } +} + func Start(msgKey string) { log.Println("========================================") log.Println("🚀 M2Pool Payment System Starting...") log.Println("========================================") server := NewServer(msgKey) - + server.initDB() // 启动消息处理 go server.handleTopupMsg() go server.handleWithdrawMsg() diff --git a/public/msg.sql b/public/msg.sql index da5df27..5d3d740 100644 --- a/public/msg.sql +++ b/public/msg.sql @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS topup_req_msg ( address TEXT NOT NULL, timestamp INTEGER, sign TEXT, - status INTEGER DEFAULT 1, + status INTEGER DEFAULT 0, -- 0未在监听,1正在监听 PRIMARY KEY (queue_id) ); @@ -56,8 +56,8 @@ CREATE TABLE IF NOT EXISTS pay_req_msg ( symbol TEXT NOT NULL, from_addr TEXT NOT NULL, to_addr TEXT NOT NULL, - total_amount TEXT NOT NULL, -- 改为 TEXT 类型 - total_fee TEXT NOT NULL, -- 改为 TEXT 类型 + amount TEXT NOT NULL, -- 改为 TEXT 类型 + fee TEXT NOT NULL, -- 改为 TEXT 类型 timestamp INTEGER, sign TEXT, status INTEGER DEFAULT 5, @@ -69,21 +69,26 @@ CREATE TABLE IF NOT EXISTS pay_resp_msg ( chain TEXT NOT NULL, symbol TEXT NOT NULL, from_addr TEXT NOT NULL, - pay_status INTEGER DEFAULT 5, - FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) + to_addr TEXT NOT NULL, + tx_hash TEXT DEFAULT NULL, + amount TEXT DEFAULT NULL, -- 改为 TEXT 类型 + fee TEXT DEFAULT NULL, -- 改为 TEXT 类型 + height INTEGER DEFAULT NULL, + status INTEGER DEFAULT 5, + FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) ); -- pay_msg的交易 -CREATE TABLE IF NOT EXISTS pay_msg_tx ( - queue_id TEXT NOT NULL, - tx_hash TEXT DEFAULT NULL, - to_addr TEXT NOT NULL, - amount TEXT DEFAULT NULL, -- 改为 TEXT 类型 - fee TEXT, -- 改为 TEXT 类型 - height INTEGER DEFAULT 0, - status INTEGER DEFAULT 5, - FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) -); +-- CREATE TABLE IF NOT EXISTS pay_msg_tx ( +-- queue_id TEXT NOT NULL, +-- tx_hash TEXT DEFAULT NULL, +-- to_addr TEXT NOT NULL, +-- amount TEXT DEFAULT NULL, -- 改为 TEXT 类型 +-- fee TEXT, -- 改为 TEXT 类型 +-- height INTEGER DEFAULT 0, +-- status INTEGER DEFAULT 5, +-- FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) +-- ); CREATE TABLE IF NOT EXISTS remove_req_msg( queue_id TEXT NOT NULL, diff --git a/public/msg_mysql.sql b/public/msg_mysql.sql index a890982..20a1dd5 100644 --- a/public/msg_mysql.sql +++ b/public/msg_mysql.sql @@ -52,39 +52,44 @@ CREATE TABLE IF NOT EXISTS withdraw_resp_msg ( ); CREATE TABLE IF NOT EXISTS pay_req_msg ( - queue_id VARCHAR(255) NOT NULL, + queue_id VARCHAR(255) NOT NULL, chain VARCHAR(32) NOT NULL, symbol VARCHAR(32) NOT NULL, from_addr VARCHAR(255) NOT NULL, to_addr VARCHAR(255) NOT NULL, - total_amount DECIMAL(40, 16) NOT NULL, - total_fee DECIMAL(40, 16) NOT NULL, + amount DECIMAL(30,16) NOT NULL, + fee DECIMAL(30,16) NOT NULL, timestamp BIGINT, sign VARCHAR(255), - status TINYINT DEFAULT 5, --遵循constant模块的定义 + status TINYINT DEFAULT 5, --遵循constant模块的定义 PRIMARY KEY (queue_id) ); CREATE TABLE IF NOT EXISTS pay_resp_msg ( - queue_id VARCHAR(255) NOT NULL, + queue_id VARCHAR(255) NOT NULL, chain VARCHAR(32) NOT NULL, symbol VARCHAR(32) NOT NULL, from_addr VARCHAR(255) NOT NULL, - pay_status TINYINT DEFAULT 5, --遵循constant模块的定义 - FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) + to_addr VARCHAR(255) NOT NULL, + tx_hash VARCHAR(255) DEFAULT NULL, + amount DECIMAL(30,16) DEFAULT NULL, + fee DECIMAL(30,16) DEFAULT NULL, + height BIGINT DEFAULT NULL, + status TINYINT DEFAULT 5, --遵循constant模块的定义 + FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) ); -- pay_msg的交易 -CREATE TABLE IF NOT EXISTS pay_msg_tx ( - queue_id VARCHAR(255) NOT NULL, - tx_hash VARCHAR(255) DEFAULT NULL, - to_addr VARCHAR(255) NOT NULL, - amount DECIMAL(30,16) DEFAULT NULL, - fee DECIMAL(30,16), - height BIGINT DEFAULT 0, - status TINYINT DEFAULT 5, --遵循constant模块的定义 - FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) -); +-- CREATE TABLE IF NOT EXISTS pay_msg_tx ( +-- queue_id VARCHAR(255) NOT NULL, +-- tx_hash VARCHAR(255) DEFAULT NULL, +-- to_addr VARCHAR(255) NOT NULL, +-- amount DECIMAL(30,16) DEFAULT NULL, +-- fee DECIMAL(30,16), +-- height BIGINT DEFAULT 0, +-- status TINYINT DEFAULT 5, --遵循constant模块的定义 +-- FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id) +-- ); CREATE TABLE IF NOT EXISTS remove_req_msg( queue_id VARCHAR(255) NOT NULL,