From 8d7da5d345b509602b738109c8963d8a8fdc7acd Mon Sep 17 00:00:00 2001 From: lzx <393768033@qq.com> Date: Fri, 31 Oct 2025 13:46:58 +0800 Subject: [PATCH] merge transactions and update --- internal/blockchain/blockchain.go | 30 +- internal/blockchain/eth/batch_transfer.go | 213 --- .../blockchain/eth/batch_transfer_example.md | 101 -- .../blockchain/eth/draft/multiSend_eth.sol | 13 + .../blockchain/eth/draft/multiSend_usdt.sol | 24 + internal/blockchain/eth/eth.go | 1371 ++++++++++------- internal/logger/transaction_logger.go | 16 +- internal/msg/msg.go | 100 +- internal/queue/rabbitmq.go | 47 +- internal/server.go | 110 +- internal/utils/utils.go | 18 + public/SQLite3.sql | 86 +- 12 files changed, 1154 insertions(+), 975 deletions(-) delete mode 100644 internal/blockchain/eth/batch_transfer.go delete mode 100644 internal/blockchain/eth/batch_transfer_example.md create mode 100644 internal/blockchain/eth/draft/multiSend_eth.sol create mode 100644 internal/blockchain/eth/draft/multiSend_usdt.sol diff --git a/internal/blockchain/blockchain.go b/internal/blockchain/blockchain.go index 7dbe6ff..a38ff64 100644 --- a/internal/blockchain/blockchain.go +++ b/internal/blockchain/blockchain.go @@ -6,10 +6,10 @@ import ( ) type IChainServer interface { - AddAddress(address string, msg any) error - RemoveAddress(address string) error - Listen(symbol string, ch chan any) - Transfer(symbol string, msg any) error + AddAddress(msg any) error + RemoveAddress(msg any) error + Listen(ch chan any) + Transfer(msg any) error Stop() } @@ -30,40 +30,40 @@ func (b *BlockChainServer) RegisterChain(name string, chain IChainServer) { b.chains[name] = chain } -func (b *BlockChainServer) AddAddress(chain, address string, msg any) error { +func (b *BlockChainServer) AddAddress(chain string, msg any) error { if srv, ok := b.chains[chain]; ok { - srv.AddAddress(address, msg) - fmt.Printf("✅ 添加监听地址: chain=%s, address=%s\n", chain, address) + srv.AddAddress(msg) + fmt.Printf("✅ 添加监听地址: chain=%s, msg=%v\n", chain, msg) return nil } else { return fmt.Errorf("⚠️ 链未注册: %s\n", chain) } } -func (b *BlockChainServer) RemoveAddress(chain, address string) error { +func (b *BlockChainServer) RemoveAddress(chain string, msg any) error { if srv, ok := b.chains[chain]; ok { - srv.RemoveAddress(address) - fmt.Printf("🗑️ 移除监听地址: chain=%s, address=%s\n", chain, address) + srv.RemoveAddress(msg) + fmt.Printf("🗑️ 移除监听地址: chain=%s, msg=%s\n", chain, msg) return nil } else { return fmt.Errorf("⚠️ 链未注册: %s\n", chain) } } -func (b *BlockChainServer) Listen(chain, symbol string, ch chan any) error { +func (b *BlockChainServer) Listen(chain string, ch chan any) error { if srv, ok := b.chains[chain]; ok { go func() { - srv.Listen(symbol, ch) + srv.Listen(ch) }() return nil } return fmt.Errorf("链未注册: %s", chain) } -func (b *BlockChainServer) Transfer(chain, symbol string, msg any) error { +func (b *BlockChainServer) Transfer(chain string, msg any) error { if srv, ok := b.chains[chain]; ok { - fmt.Printf("💸 %s-%s发起转账: %+v\n", chain, symbol, msg) - return srv.Transfer(symbol, msg) + fmt.Printf("💸 %s发起转账: %+v\n", chain, msg) + return srv.Transfer(msg) } return fmt.Errorf("链未注册: %s", chain) } diff --git a/internal/blockchain/eth/batch_transfer.go b/internal/blockchain/eth/batch_transfer.go deleted file mode 100644 index f82d5e1..0000000 --- a/internal/blockchain/eth/batch_transfer.go +++ /dev/null @@ -1,213 +0,0 @@ -package eth - -import ( - "crypto/ecdsa" - "fmt" - "log" - "m2pool-payment/internal/utils" - "math/big" - "strings" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" -) - -// BatchTransferItem 批量转账单项 -type BatchTransferItem struct { - ToAddress string // 接收地址 - Amount float64 // 转账金额 -} - -// BatchTransferResult 批量转账结果 -type BatchTransferResult struct { - TxHash string // 交易哈希 - Success bool // 是否成功 - TotalAmount float64 // 总转账金额 - Count int // 转账笔数 -} - -// usdt_batch_transfer 批量转账ERC20-USDT -// from: 发送地址 -// items: 批量转账列表 -// returns: 交易哈希和错误信息 -func (e *ETHNode) USDTBatchTransfer(from string, items []BatchTransferItem) (*BatchTransferResult, error) { - if len(items) == 0 { - return nil, fmt.Errorf("批量转账列表不能为空") - } - - // 统一转换为小写 - from = strings.ToLower(from) - - // 计算总金额 - var totalAmount float64 - for _, item := range items { - if item.Amount <= 0 { - return nil, fmt.Errorf("转账金额必须大于0") - } - totalAmount += item.Amount - } - - // 1. 校验钱包USDT余额 - balance, err := e.getUSDTBalance(from) - log.Printf("🔄 批量转账 - 检测钱包=%s,余额=%.2f USDT", from, balance) - if err != nil { - return nil, fmt.Errorf("获取余额失败: %w", err) - } - - if balance < totalAmount { - return nil, fmt.Errorf("余额不足: 余额=%.2f USDT < 需要=%.2f USDT", balance, totalAmount) - } - - // 2. 通过from地址前往数据库查找出对应加密后的私钥,并解密真实的私钥 - originalKey := e.decodePrivatekey(from) - if originalKey == "" { - return nil, fmt.Errorf("无法获取私钥") - } - - privateKey, err := crypto.HexToECDSA(originalKey) - if err != nil { - return nil, fmt.Errorf("解析私钥失败: %w", err) - } - - // 3. 获得nonce - nonce, err := e.RpcClient.PendingNonceAt(e.Ctx, common.HexToAddress(from)) - if err != nil { - return nil, fmt.Errorf("获取nonce失败: %w", err) - } - - // 4. 构造批量转账数据 - // 使用 transfer(address[], uint256[]) 或多次transfer调用 - // 这里使用多次transfer调用的方式,因为标准ERC20没有批量转账方法 - - // 方法1: 构造多次transfer调用(适合少量转账) - if len(items) <= 3 { - return e.batchTransferMultipleCalls(from, privateKey, nonce, items) - } - - // 方法2: 使用合约批量转账(需要部署代理合约,这里简化处理) - // 注意:这里实现的是多次transaction方式 - return e.batchTransferSeparateTransactions(from, privateKey, nonce, items) -} - -// batchTransferMultipleCalls 使用一个交易多次调用transfer(需要gas优化) -func (e *ETHNode) batchTransferMultipleCalls(from string, privateKey *ecdsa.PrivateKey, nonce uint64, items []BatchTransferItem) (*BatchTransferResult, error) { - // 注意:标准ERC20不支持批量transfer,这里需要自定义合约 - // 或者使用多次独立交易 - log.Printf("⚠️ 标准ERC20不支持批量transfer,改用多次独立交易") - - // 回退到多次独立交易 - return e.batchTransferSeparateTransactions(from, privateKey, nonce, items) -} - -// batchTransferSeparateTransactions 执行多次独立的transfer交易 -func (e *ETHNode) batchTransferSeparateTransactions(from string, privateKey *ecdsa.PrivateKey, nonce uint64, items []BatchTransferItem) (*BatchTransferResult, error) { - var totalAmount float64 - var txHashes []string - var allSuccess bool = true - - for i, item := range items { - // 构造单个transfer交易 - amountBigInt := utils.Float64ToBigIntUSDT(item.Amount) - data, err := e.USDT.ABI.Pack("transfer", common.HexToAddress(strings.ToLower(item.ToAddress)), amountBigInt) - if err != nil { - log.Printf("❌ 批量转账第%d笔打包失败: %v", i+1, err) - allSuccess = false - continue - } - - // 获取gas limit - gasLimit, err := e.getGasLimit() - if err != nil { - log.Printf("❌ 批量转账第%d笔获取gasLimit失败: %v", i+1, err) - allSuccess = false - continue - } - - // 获取gas费用 - maxFeePerGas, maxPriorityFeePerGas, err := e.getEIP1559GasFees() - - var txHash string - if err != nil { - // 回退到传统gas price - gasPrice, err := e.getSuggestGasPrice() - if err != nil { - log.Printf("❌ 批量转账第%d笔获取gasPrice失败: %v", i+1, err) - allSuccess = false - continue - } - - tx := types.NewTransaction(nonce+uint64(i), e.USDT.Address, big.NewInt(0), gasLimit, gasPrice, data) - signedTx, err := types.SignTx(tx, types.NewEIP155Signer(e.NetId), privateKey) - if err != nil { - log.Printf("❌ 批量转账第%d笔签名失败: %v", i+1, err) - allSuccess = false - continue - } - - txHash = signedTx.Hash().Hex() - err = e.RpcClient.SendTransaction(e.Ctx, signedTx) - if err != nil { - log.Printf("❌ 批量转账第%d笔发送失败: %v", i+1, err) - allSuccess = false - continue - } - } else { - // 使用EIP-1559交易 - ethBalance, err := e.getETHBlance(from) - if err != nil { - log.Printf("❌ 批量转账第%d笔获取ETH余额失败: %v", i+1, err) - allSuccess = false - continue - } - - maxGasCost := new(big.Int).Mul(new(big.Int).SetUint64(gasLimit), maxFeePerGas) - if ethBalance.Cmp(maxGasCost) == -1 { - log.Printf("❌ 批量转账第%d笔ETH余额不足", i+1) - allSuccess = false - continue - } - - tx := types.NewTx(&types.DynamicFeeTx{ - ChainID: e.NetId, - Nonce: nonce + uint64(i), - GasTipCap: maxPriorityFeePerGas, - GasFeeCap: maxFeePerGas, - Gas: gasLimit, - To: &e.USDT.Address, - Value: big.NewInt(0), - Data: data, - }) - - signedTx, err := types.SignTx(tx, types.NewLondonSigner(e.NetId), privateKey) - if err != nil { - log.Printf("❌ 批量转账第%d笔签名失败: %v", i+1, err) - allSuccess = false - continue - } - - txHash = signedTx.Hash().Hex() - err = e.RpcClient.SendTransaction(e.Ctx, signedTx) - if err != nil { - log.Printf("❌ 批量转账第%d笔发送失败: %v", i+1, err) - allSuccess = false - continue - } - } - - txHashes = append(txHashes, txHash) - totalAmount += item.Amount - log.Printf("✅ 批量转账第%d笔已提交: %s, 金额=%.2f USDT, 收款地址=%s", - i+1, txHash, item.Amount, strings.ToLower(item.ToAddress)) - } - - log.Printf("📊 批量转账完成: 总计%d笔, 成功%d笔, 总金额=%.2f USDT", - len(items), len(txHashes), totalAmount) - - return &BatchTransferResult{ - TxHash: strings.Join(txHashes, ","), - Success: allSuccess && len(txHashes) == len(items), - TotalAmount: totalAmount, - Count: len(txHashes), - }, nil -} diff --git a/internal/blockchain/eth/batch_transfer_example.md b/internal/blockchain/eth/batch_transfer_example.md deleted file mode 100644 index 49fc196..0000000 --- a/internal/blockchain/eth/batch_transfer_example.md +++ /dev/null @@ -1,101 +0,0 @@ -# ERC20-USDT 批量转账功能 - -## 功能说明 - -该文件 `batch_transfer.go` 提供了 ERC20-USDT 的批量转账功能,支持从同一个发送地址向多个不同的接收地址转账。 - -## 主要功能 - -### 1. 批量转账类型 - -```go -type BatchTransferItem struct { - ToAddress string // 接收地址 - Amount float64 // 转账金额 -} - -type BatchTransferResult struct { - TxHash string // 交易哈希(多个用逗号分隔) - Success bool // 是否成功 - TotalAmount float64 // 总转账金额 - Count int // 转账笔数 -} -``` - -### 2. 使用方法 - -```go -// 1. 准备批量转账列表 -items := []eth.BatchTransferItem{ - {ToAddress: "0xRecipient1", Amount: 100.0}, - {ToAddress: "0xRecipient2", Amount: 200.0}, - {ToAddress: "0xRecipient3", Amount: 50.0}, -} - -// 2. 调用批量转账 -fromAddress := "0xYourAddress" -result, err := ethNode.USDTBatchTransfer(fromAddress, items) -if err != nil { - log.Fatalf("批量转账失败: %v", err) -} - -// 3. 处理结果 -fmt.Printf("批量转账完成: %d笔, 总金额: %.2f USDT", result.Count, result.TotalAmount) -fmt.Printf("交易哈希: %s", result.TxHash) -``` - -## 工作原理 - -由于标准 ERC20 合约不支持批量转账,本实现采用以下策略: - -1. **多次独立交易**:对每笔转账创建一个独立的 ERC20 `transfer` 交易 -2. **Nonce 管理**:自动管理 nonce,确保交易按顺序广播 -3. **Gas 费用**:支持 EIP-1559 动态费用和传统 gas price -4. **错误处理**:单笔失败不影响其他交易,返回成功和失败的详细统计 - -## 注意事项 - -### 1. Gas 费用 - -- 每笔转账需要独立的 gas 费用(约 65,000 gas) -- 批量转账 10 笔需要约 650,000 gas -- 确保发送地址有足够的 ETH 作为 gas 费用 - -### 2. 余额检查 - -- 函数会自动检查 USDT 余额是否足够 -- 如果余额不足,会返回错误并终止转账 - -### 3. 部分成功 - -- 如果某些转账失败,函数会继续执行其他转账 -- 返回结果中包含成功笔数和详细交易哈希 - -### 4. 网络拥堵 - -- 在高网络拥堵时,某些交易可能被推迟 -- 建议监控所有交易状态 - -## 性能优化建议 - -如果需要更高效的批量转账,考虑: - -1. **部署批量转账代理合约**:实现一个合约方法 `batchTransfer(address[] to, uint256[] amounts)` -2. **使用多签钱包**:减少私钥管理风险 -3. **Gas 优化**:使用更低的 gas price 分批发送 - -## 示例输出 - -``` -🔄 批量转账 - 检测钱包=0x...,余额=1000.00 USDT -✅ 批量转账第1笔已提交: 0xabc123..., 金额=100.00 USDT, 收款地址=0x... -✅ 批量转账第2笔已提交: 0xdef456..., 金额=200.00 USDT, 收款地址=0x... -✅ 批量转账第3笔已提交: 0x789ghi..., 金额=50.00 USDT, 收款地址=0x... -📊 批量转账完成: 总计3笔, 成功3笔, 总金额=350.00 USDT -``` - -## 限制 - -- 标准 ERC20 不支持真正的批量转账(单笔交易) -- 需要确保发送地址有足够的 ETH 作为 gas 费用 -- 交易按顺序发送,可能在高负载时较慢 diff --git a/internal/blockchain/eth/draft/multiSend_eth.sol b/internal/blockchain/eth/draft/multiSend_eth.sol new file mode 100644 index 0000000..0838279 --- /dev/null +++ b/internal/blockchain/eth/draft/multiSend_eth.sol @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: MIT +pragma solidity ^0.8.0; + +contract MultiSend { + function multiTransfer(address[] calldata to, uint256[] calldata amounts) external payable { + uint256 length = to.length; + require(length == amounts.length, "Arrays must have the same length"); + + for (uint256 i = 0; i < length; i++) { + payable(to[i]).transfer(amounts[i]); + } + } +} \ No newline at end of file diff --git a/internal/blockchain/eth/draft/multiSend_usdt.sol b/internal/blockchain/eth/draft/multiSend_usdt.sol new file mode 100644 index 0000000..1a15dcf --- /dev/null +++ b/internal/blockchain/eth/draft/multiSend_usdt.sol @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: MIT +pragma solidity ^0.8.0; + +interface IERC20 { + function transfer(address recipient, uint256 amount) external returns (bool); +} + +contract MultiSendUSDT { + address public tokenAddress; + + constructor(address _tokenAddress) { + tokenAddress = _tokenAddress; + } + + function multiTransfer(address[] calldata to, uint256[] calldata amounts) external { + IERC20 token = IERC20(tokenAddress); + uint256 length = to.length; + require(length == amounts.length, "Arrays must have the same length"); + + for (uint256 i = 0; i < length; i++) { + token.transfer(to[i], amounts[i]); + } + } +} \ No newline at end of file diff --git a/internal/blockchain/eth/eth.go b/internal/blockchain/eth/eth.go index 82c5884..f2aae2f 100644 --- a/internal/blockchain/eth/eth.go +++ b/internal/blockchain/eth/eth.go @@ -52,27 +52,56 @@ const erc20ABI = ` ] ` +// USDT 合约地址(主网) +const USDTAddress = "0xdAC17F958D2ee523a2206206994597C13D831ec7" + +// 状态码常量 +const ( + STATUS_FAILED = 0 // 失败 + STATUS_SUCCESS = 1 // 成功 + STATUS_PENDING = 2 // 待确认 + STATUS_VERIFY_FAILED = 3 // 验证失败 +) + type ETHNode struct { - decodeKey string // 私钥解密密钥,从程序启动命令行获得 - NetId *big.Int // 网络ID,主网为1,其他ID可通过rpc.NetworkID方法获取 - Config message.ETHConfig // 配置文件 - WsClient *ethclient.Client - RpcClient *ethclient.Client - Db db.MySQLPool - mu sync.Mutex - ListenAddresses sync.Map // key:"钱包地址", value:bool - UnConfirmTxs map[string]message.Tx_msg // 待交易地址池,key:"交易hash", value: message.Tx - USDT *USDT // ETH相关 - RmqMsgs map[string][]any // 根据地址查找出该地址涉及的消息,消息需要断言(topupreq_msg, payreq_msg, withdrawreq_msg) - Ctx context.Context - Cancel context.CancelFunc + decodeKey string // 私钥解密密钥,仅针对普通钱包 + NetID *big.Int // 网络ID,主网为1,其他ID可通过rpc.NetworkID方法获取 + Config message.ETHConfig + ConfirmHeight uint64 + WsClient *ethclient.Client + RpcClient *ethclient.Client + Db db.MySQLPool + // {"address": message.TopupMsg_req{}, ...},仅针对充值 + TopupMsg map[string]*message.TopupMsg_req + // {"fromAddress": message.WithdrawMsg_req{}, ...},仅针对提现 + WithdrawMsg map[string]*message.WithdrawMsg_req + // {"fromAddress": message.PayMsg_req{}, ...}, 仅针对支付 + PayMsg map[string]*message.PayMsg_req + // {"tx_hash": message.Transaction} + UnConfirmTxs map[string]*message.Transaction + LogsChan chan *types.Header + USDT *USDT + RealData *RealData + Ctx context.Context + Cancel context.CancelFunc + mu sync.Mutex } type USDT struct { - Address common.Address // USDT合约地址 - ABI abi.ABI // USDT ABI - TransferSig common.Hash // USDT函数签名 - LogsChan chan types.Log + Address common.Address // USDT合约地址 + ListeningAddresses map[string]any // 监听的USDT转账消息 + ABI abi.ABI // USDT ABI + TransferSig common.Hash // USDT函数签名 + LogsChan chan types.Log +} + +type RealData struct { + mu sync.Mutex + Heihgt uint64 + GasLimit uint64 + GasTipCap *big.Int + GasFeeCap *big.Int + GasPrice *big.Int // 老版本转账使用的gas } func NewETHNode(cfg message.ETHConfig, decodeKey string) (*ETHNode, error) { @@ -100,6 +129,7 @@ func NewETHNode(cfg message.ETHConfig, decodeKey string) (*ETHNode, error) { usdt.ABI = func() abi.ABI { a, _ := abi.JSON(strings.NewReader(erc20ABI)); return a }() // 解析合约ABI usdt.TransferSig = crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")) // 解析合约transfer函数签名 usdt.LogsChan = make(chan types.Log, 1000) // 初始化合约日志通道 + usdt.ListeningAddresses = make(map[string]any) // 初始化数据库 dbConn, err := db.NewMySQLPool(cfg.DbConfig) @@ -107,78 +137,199 @@ func NewETHNode(cfg message.ETHConfig, decodeKey string) (*ETHNode, error) { cancel() return nil, fmt.Errorf("mysql connect error: %w", err) } + // 初始化结构 + topup := make(map[string]*message.TopupMsg_req) + withdraw := make(map[string]*message.WithdrawMsg_req) + pay := make(map[string]*message.PayMsg_req) + unConfirmTxs := make(map[string]*message.Transaction) + logsChan := make(chan *types.Header, 1000) + // 启动时初始化实时数据 + realData := &RealData{} return ÐNode{ - decodeKey: decodeKey, - NetId: netId, - Config: cfg, - WsClient: ws_client, - RpcClient: rpc_client, - Db: *dbConn, - ListenAddresses: sync.Map{}, - UnConfirmTxs: make(map[string]message.Tx_msg), - USDT: usdt, - RmqMsgs: make(map[string][]any), - Ctx: ctx, - Cancel: cancel, + decodeKey: decodeKey, + NetID: netId, + Config: cfg, + ConfirmHeight: cfg.ConfirmHeight, + WsClient: ws_client, + RpcClient: rpc_client, + Db: *dbConn, + TopupMsg: topup, + WithdrawMsg: withdraw, + PayMsg: pay, + UnConfirmTxs: unConfirmTxs, + LogsChan: logsChan, + USDT: usdt, + RealData: realData, + Ctx: ctx, + Cancel: cancel, }, nil } // ============================ 抽象接口 ============================ -func (e *ETHNode) AddAddress(address string, rmq_msg any) error { - // 统一转换为小写 - address = strings.ToLower(address) - log.Printf("新增钱包监听消息:%v", rmq_msg) - e.ListenAddresses.Store(address, true) - e.mu.Lock() - e.RmqMsgs[address] = append(e.RmqMsgs[address], rmq_msg) - e.mu.Unlock() - return nil -} - -func (e *ETHNode) RemoveAddress(address string) error { - // 统一转换为小写 - address = strings.ToLower(address) - e.ListenAddresses.Delete(address) - e.mu.Lock() - delete(e.RmqMsgs, address) - e.mu.Unlock() - return nil -} - -func (e *ETHNode) Listen(symbol string, ch chan any) { - // 启动新区块监听(用于触发交易确认检查) - go e.listenNewBlocks("USDT", ch) - switch symbol { - case "USDT": - // 启动 USDT Transfer 事件监听 - err := e.listen_usdt(ch) - if err != nil { - log.Fatal("Listen USDT Transactions Error:", err) - } - } -} - -func (e *ETHNode) Transfer(symbol string, msg any) error { - switch symbol { - case "USDT": - err := e.usdt_transfer(msg) - if err != nil { - return fmt.Errorf("%s transfer ERROR: %w", symbol, err) - } +func (e *ETHNode) AddAddress(msg any) error { + switch v := msg.(type) { + case message.TopupMsg_req: + e.mu.Lock() + addr := strings.ToLower(v.Address) + e.TopupMsg[addr] = &v + e.mu.Unlock() + case message.WithdrawMsg_req: + e.mu.Lock() + addr := strings.ToLower(v.ToAddress) + e.WithdrawMsg[addr] = &v + e.mu.Unlock() + case message.PayMsg_req: + e.mu.Lock() + addr := strings.ToLower(v.FromAddress) + e.PayMsg[addr] = &v + e.mu.Unlock() default: - return fmt.Errorf("unsupported symbol: %s", symbol) + return fmt.Errorf("unsupported message type: %T", msg) + } + return nil +} + +func (e *ETHNode) RemoveAddress(msg any) error { + switch v := msg.(type) { + case message.RemoveListenMsg_req: + e.mu.Lock() + addr := strings.ToLower(v.Address) + delete(e.TopupMsg, addr) + e.mu.Unlock() + case message.WithdrawMsg_req: + e.mu.Lock() + addr := strings.ToLower(v.ToAddress) + delete(e.WithdrawMsg, addr) + e.mu.Unlock() + case message.PayMsg_req: + e.mu.Lock() + addr := strings.ToLower(v.FromAddress) + delete(e.PayMsg, addr) + e.mu.Unlock() + default: + return fmt.Errorf("unsupported message type: %T", msg) + } + return nil +} + +func (e *ETHNode) Listen(ch chan any) { + log.Println("✅ 开始监听 ETH 和 USDT 转账事件...") + go func() { + err := e.listenETHTransactions(ch) + if err != nil { + log.Fatalf("Listen ETH Transactions Error: %v", err) + } + }() + + go func() { + err := e.listenUSDTTransactions(ch) + if err != nil { + log.Fatalf("Listen USDT Transactions Error: %v", err) + } + }() +} + +// 转账 +func (e *ETHNode) Transfer(msg any) error { + switch v := msg.(type) { + case message.WithdrawMsg_req: + // 1. 校验余额 + verifyResult, err := e.verifyBalance(v.Symbol, v.FromAddress, v.Amount) + if err != nil || !verifyResult { + log.Printf("address (%s) balance verification failed: %v", v.FromAddress, err) + return err + } + + // 2. 构建未签名交易 + unSignTx, err := e.contractTx(v.Symbol, v.FromAddress, v.ToAddress, v.Amount) + if err != nil { + log.Printf("failed to create contract transaction: %v", err) + return err + } + + // 3. 签名交易 + signedTx, err := e.signTx(unSignTx, v.FromAddress) + if err != nil { + log.Printf("failed to sign transaction: %v", err) + return err + } + txHash := signedTx.Hash().Hex() + // 4. 发送交易并存入交易池 + if err := e.sendTransaction(signedTx, txHash, v.Symbol, v.FromAddress, v.ToAddress, v.Amount); err != nil { + log.Printf("failed to send transaction: %v", err) + } + // 5. 将tx_hash添加到消息中 + if wm, ok := e.WithdrawMsg[v.FromAddress]; ok { + wm.TxHash = txHash + e.WithdrawMsg[v.FromAddress] = wm + } else { + // 如果不存在原始消息,创建一条新的记录并写回 map(根据实际结构字段调整) + v.TxHash = txHash + e.WithdrawMsg[v.FromAddress] = &v + } + case message.PayMsg_req: + // 1. 校验余额 + verifyResult, err := e.verifyBalance(v.Symbol, v.FromAddress, v.TotalAmount) + if err != nil || !verifyResult { + log.Printf("address (%s) balance verification failed: %v", v.FromAddress, err) + return err + } + + // 2. 预先获取起始 nonce 避免并发冲突 + startNonce, err := e.getTransactionNonce(v.FromAddress) + if err != nil { + log.Printf("failed to get start nonce: %v", err) + return err + } + + // 3. 按顺序发送多笔交易(注意:顺序发送可以避免 nonce 冲突,但可能较慢) + i := 0 + for toAddr, tx := range v.Trasnactions { + // 构建未签名交易(使用递增的 nonce) + unSignTx, err := e.buildContractTxWithNonce(v.Symbol, v.FromAddress, tx.ToAddress, tx.Amount, startNonce+uint64(i)) + if err != nil { + log.Printf("failed to create contract transaction: %v", err) + i++ + continue + } + + // 签名交易 + signedTx, err := e.signTx(unSignTx, v.FromAddress) + if err != nil { + log.Printf("failed to sign transaction: %v", err) + i++ + continue + } + txHash := signedTx.Hash().Hex() + + // 发送交易并存入交易池 + if err := e.sendTransaction(signedTx, txHash, v.Symbol, v.FromAddress, tx.ToAddress, tx.Amount); err != nil { + log.Printf("failed to send transaction: %v", err) + i++ + continue + } + + // 将tx_hash添加到消息中 + e.mu.Lock() + tx.TxHash = txHash + e.PayMsg[v.FromAddress].Trasnactions[toAddr] = tx + e.mu.Unlock() + i++ + } } return nil } func (e *ETHNode) Stop() { - e.Cancel() + if e.Cancel != nil { + e.Cancel() + } + log.Println("🛑 停止监听...") } // ============================ rpc节点方法 ============================ - -func (e *ETHNode) getETHBlance(address string) (*big.Int, error) { +func (e *ETHNode) getETHBalance(address string) (*big.Int, error) { account := common.HexToAddress(address) ctx := context.Background() balance, err := e.RpcClient.BalanceAt(ctx, account, nil) // nil表示最新高度 @@ -192,14 +343,14 @@ func (e *ETHNode) getETHBlance(address string) (*big.Int, error) { return balance, nil } -func (e *ETHNode) getUSDTBalance(address string) (float64, error) { +func (e *ETHNode) getUSDTBalance(address string) (*big.Int, error) { // 统一转换为小写(common.HexToAddress会自动处理,但为了一致性显式转换) address = strings.ToLower(address) contractAddress := e.USDT.Address accountAddress := common.HexToAddress(address) data, err := e.USDT.ABI.Pack("balanceOf", accountAddress) if err != nil { - return 0, fmt.Errorf("failed to pack balanceOf data: %w", err) + return nil, fmt.Errorf("failed to pack balanceOf data: %w", err) } msg := ethereum.CallMsg{ To: &contractAddress, @@ -208,19 +359,28 @@ func (e *ETHNode) getUSDTBalance(address string) (float64, error) { // 使用 CallContract 方法查询合约余额 res, err := e.RpcClient.CallContract(e.Ctx, msg, nil) if err != nil { - return 0, fmt.Errorf("failed to get contract balance: %w", err) + return nil, fmt.Errorf("failed to get contract balance: %w", err) } // 解析返回的字节为 *big.Int outputs, err := e.USDT.ABI.Unpack("balanceOf", res) if err != nil || len(outputs) == 0 { - return 0, fmt.Errorf("failed to unpack balanceOf result: %w", err) + return nil, fmt.Errorf("failed to unpack balanceOf result: %w", err) } balance, ok := outputs[0].(*big.Int) if !ok { - return 0, fmt.Errorf("unexpected type for balanceOf result") + return nil, fmt.Errorf("unexpected type for balanceOf result") } - bal := utils.BigIntUSDTToFloat64(balance) - return bal, nil + // bal := utils.BigIntUSDTToFloat64(balance) + return balance, nil +} + +func (e *ETHNode) getTransactionNonce(address string) (uint64, error) { + nonce, err := e.RpcClient.PendingNonceAt(e.Ctx, common.HexToAddress(address)) + if err != nil { + log.Fatalf("failed to get nonce: %v", err) + return 0, err + } + return nonce, nil } func (e *ETHNode) getGasLimit() (uint64, error) { @@ -242,14 +402,26 @@ func (e *ETHNode) getSuggestGasPrice() (*big.Int, error) { maxGasPrice := new(big.Int).SetUint64(20000000000) // 20 Gwei if gasPrice.Cmp(maxGasPrice) > 0 { - log.Printf("⚠️ 建议gas price过高 (%v Gwei),使用上限 20 Gwei", new(big.Int).Div(gasPrice, big.NewInt(1000000000))) + log.Printf("⚠️ 建议gas price过高 (%v wei),使用上限 20 Gwei", new(big.Int).Div(gasPrice, big.NewInt(1e18))) return maxGasPrice, nil } - log.Printf("✅ 使用建议gas price: %v Gwei", new(big.Int).Div(gasPrice, big.NewInt(1000000000))) + // log.Printf("✅ 使用建议gas price: %v wei", new(big.Int).Div(gasPrice, big.NewInt(1e18))) return gasPrice, nil } +func (e *ETHNode) checkTransaction(tx_hash string) (bool, error) { + receipt, err := e.RpcClient.TransactionReceipt(e.Ctx, common.HexToHash(tx_hash)) + if err != nil { + return false, fmt.Errorf("check tx(%s) error: %v", tx_hash, err) + } + if receipt.Status == types.ReceiptStatusSuccessful { + return true, nil + } else { + return false, nil + } +} + // getEIP1559GasFees 获取EIP-1559的gas费用参数 func (e *ETHNode) getEIP1559GasFees() (*big.Int, *big.Int, error) { ctx := context.Background() @@ -274,21 +446,64 @@ func (e *ETHNode) getEIP1559GasFees() (*big.Int, *big.Int, error) { // 设置最大费用上限为30 Gwei maxFeeLimit := new(big.Int).SetUint64(30000000000) // 30 Gwei if maxFeePerGas.Cmp(maxFeeLimit) > 0 { - log.Printf("⚠️ 计算的最大费用过高 (%v Gwei),使用上限 30 Gwei", new(big.Int).Div(maxFeePerGas, big.NewInt(1000000000))) + log.Printf("⚠️ 计算的最大费用过高 (%v wei),使用上限 30 Gwei", new(big.Int).Div(maxFeePerGas, big.NewInt(1e18))) maxFeePerGas = maxFeeLimit } - log.Printf("✅ EIP-1559 Gas费用: BaseFee=%v Gwei, MaxPriorityFee=%v Gwei, MaxFee=%v Gwei", - new(big.Int).Div(baseFee, big.NewInt(1000000000)), - new(big.Int).Div(maxPriorityFeePerGas, big.NewInt(1000000000)), - new(big.Int).Div(maxFeePerGas, big.NewInt(1000000000))) + // log.Printf("✅ EIP-1559 Gas费用: BaseFee=%v wei, MaxPriorityFee=%v wei, MaxFee=%v wei", + // new(big.Int).Div(baseFee, big.NewInt(1e18)), + // new(big.Int).Div(maxPriorityFeePerGas, big.NewInt(1e18)), + // new(big.Int).Div(maxFeePerGas, big.NewInt(1e18))) return maxFeePerGas, maxPriorityFeePerGas, nil } -// ============================ 业务方法 ============================ -func (e *ETHNode) listen_usdt(ch chan any) error { - fmt.Println("🔍 ETH 开始监听 USDT Transfer 事件...") +// ============================ 业务逻辑 ============================ +// 监听ETH转账 +func (e *ETHNode) listenETHTransactions(ch chan any) error { + fmt.Println("🔍 开始ETH交易...") + + headers := make(chan *types.Header, 10) + + // 负责重连 + for { + // 订阅新区块头 + sub, err := e.WsClient.SubscribeNewHead(e.Ctx, headers) + if err != nil { + fmt.Println("❌ 订阅ETH交易失败, 5秒后重试:", err) + time.Sleep(5 * time.Second) + continue + } + fmt.Println("✅ ETH交易订阅成功") + + // 处理新区块 + for { + select { + case err := <-sub.Err(): + fmt.Println("⚠️ ETH交易订阅异常,准备重连:", err) + sub.Unsubscribe() + time.Sleep(3 * time.Second) + goto reconnect + + case header := <-headers: + // 每当有新区块,检查待确认交易 + currentHeight := header.Number.Uint64() + go e.updateRealData(currentHeight) + go e.handleETHEvent(header, ch) + go e.confirm(ch) + case <-e.Ctx.Done(): + fmt.Println("🛑 收到停止信号,退出ETH交易监听") + sub.Unsubscribe() + return e.Ctx.Err() + } + } + reconnect: + } +} + +// 监听USDT转账 +func (e *ETHNode) listenUSDTTransactions(ch chan any) error { + fmt.Println("🔍 开始USDT交易...") // 过滤掉非USDT数据 query := ethereum.FilterQuery{ Addresses: []common.Address{e.USDT.Address}, @@ -298,25 +513,25 @@ func (e *ETHNode) listen_usdt(ch chan any) error { // 订阅日志 sub, err := e.WsClient.SubscribeFilterLogs(e.Ctx, query, e.USDT.LogsChan) if err != nil { - fmt.Println("❌ 订阅失败, 5秒后重试:", err) + fmt.Println("❌ USDT交易订阅失败, 5秒后重试:", err) time.Sleep(5 * time.Second) continue } - fmt.Println("✅ 订阅成功") + fmt.Println("✅ USDT交易订阅成功") // 处理事件 for { select { case err := <-sub.Err(): - fmt.Println("⚠️ 订阅异常,准备重连:", err) + fmt.Println("⚠️ USDT交易订阅异常,准备重连:", err) sub.Unsubscribe() // 清理旧订阅 time.Sleep(3 * time.Second) goto reconnect // 跳出内层循环,回到外层重新订阅 case vLog := <-e.USDT.LogsChan: - e.handleUSDTEvent(vLog, ch) // 事件解析 + 分类,传递链消息的通道是vLog而非ch,且一次只传递一笔交易 + go e.handleUSDTEvent(vLog, ch) // 事件解析 + 分类,传递链消息的通道是vLog而非ch,且一次只传递一笔交易 case <-e.Ctx.Done(): - fmt.Println("🛑 收到停止信号,退出监听") + fmt.Println("🛑 收到停止信号,退出USDT交易监听") sub.Unsubscribe() return e.Ctx.Err() } @@ -325,6 +540,194 @@ func (e *ETHNode) listen_usdt(ch chan any) error { } } +func (e *ETHNode) updateRealData(height uint64) { + // 创建 WaitGroup + var wg sync.WaitGroup + + // 用于保存每个方法的结果 + var gasLimit uint64 + var suggestGasPrice *big.Int + var maxFeePerGas, maxPriorityFeePerGas *big.Int + var gasLimitErr, suggestGasPriceErr, eip1559GasFeesErr error + + // 启动协程并等待所有结果 + wg.Add(3) + + // 获取 Gas Limit + go func() { + defer wg.Done() + gasLimit, gasLimitErr = e.getGasLimit() + if gasLimitErr != nil { + log.Printf("Failed to get gas limit: %v", gasLimitErr) + } else { + // log.Printf("Gas Limit: %d", gasLimit) + } + }() + + // 获取建议 Gas Price + go func() { + defer wg.Done() + suggestGasPrice, suggestGasPriceErr = e.getSuggestGasPrice() + if suggestGasPriceErr != nil { + log.Printf("Failed to get suggested gas price: %v", suggestGasPriceErr) + } else { + // log.Printf("Suggested Gas Price: %v Gwei", new(big.Int).Div(suggestGasPrice, big.NewInt(1000000000))) + } + }() + + // 获取 EIP-1559 Gas Fees + go func() { + defer wg.Done() + maxFeePerGas, maxPriorityFeePerGas, eip1559GasFeesErr = e.getEIP1559GasFees() + if eip1559GasFeesErr != nil { + log.Printf("Failed to get EIP-1559 gas fees: %v", eip1559GasFeesErr) + } else { + // log.Printf("EIP-1559 Gas Fees: MaxFeePerGas: %v Gwei, MaxPriorityFeePerGas: %v Gwei", + // new(big.Int).Div(maxFeePerGas, big.NewInt(1000000000)), + // new(big.Int).Div(maxPriorityFeePerGas, big.NewInt(1000000000))) + } + }() + + // 等待所有协程完成 + wg.Wait() + + // 检查是否有任何错误 + if gasLimitErr != nil || suggestGasPriceErr != nil || eip1559GasFeesErr != nil { + log.Println("One or more methods failed. Not updating RealData.") + return + } + + // 更新 RealData + e.RealData.mu.Lock() + defer e.RealData.mu.Unlock() + e.RealData = &RealData{ + Heihgt: height, + GasLimit: gasLimit, + GasTipCap: maxPriorityFeePerGas, + GasFeeCap: maxFeePerGas, + GasPrice: suggestGasPrice, + } + // log.Println("✅ RealData updated successfully.") +} + +func (e *ETHNode) handleETHEvent(header *types.Header, ch chan any) { + height := header.Number.Uint64() + + // 获取区块中的所有交易 + block, err := e.RpcClient.BlockByHash(e.Ctx, header.Hash()) + if err != nil { + log.Printf("无法获取区块信息: %v", err) + return + } + + // 遍历区块中的每笔交易 + for _, tx := range block.Transactions() { + txHash := tx.Hash().Hex() + + // 只处理ETH转账(Value > 0) + if tx.Value().Sign() <= 0 { + continue + } + + // 使用 types.Sender 获取发送方地址 + signer := types.LatestSignerForChainID(e.NetID) + from, err := types.Sender(signer, tx) + if err != nil { + log.Println("获取发送方地址失败:", err) + continue + } + + toAddr := "" + if tx.To() != nil { + toAddr = strings.ToLower(tx.To().Hex()) + } + fromAddr := strings.ToLower(from.Hex()) + + // 获取交易金额 + amount := utils.BigIntETHToFloat64(tx.Value()) + + // 处理充值 + for k, v := range e.TopupMsg { + if k == toAddr { + // 锁定并更新未确认的交易 + e.mu.Lock() + e.UnConfirmTxs[txHash] = &message.Transaction{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: txHash, + Symbol: v.Symbol, + Value: amount, + Status: STATUS_PENDING, + } + e.TopupMsg[k].Status = STATUS_PENDING + e.mu.Unlock() + + // 创建待确认充值消息 + topup_unconfirm_msg_resp := message.TopupMsg_resp{ + QueueId: v.QueueId, + Address: v.Address, + Status: STATUS_PENDING, + Chain: v.Chain, + Amount: amount, + TxHash: txHash, + BlockHeight: height, + } + + // 异步发送消息到通道 + go func(msg message.TopupMsg_resp) { + select { + case ch <- msg: + log.Printf("✅ 待确认充值消息已发送") + default: + log.Printf("⚠️ 通道阻塞,待确认消息发送失败") + } + }(topup_unconfirm_msg_resp) + } + } + + // 处理提现 + for k, v := range e.WithdrawMsg { + if strings.EqualFold(v.FromAddress, fromAddr) && strings.EqualFold(v.ToAddress, toAddr) && v.Amount == amount { + e.mu.Lock() + e.UnConfirmTxs[txHash] = &message.Transaction{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: txHash, + Symbol: v.Symbol, + Value: amount, + Status: STATUS_PENDING, + } + e.WithdrawMsg[k].Status = STATUS_PENDING + e.mu.Unlock() + } + } + + // 处理支付 + for k, v := range e.PayMsg { + if strings.EqualFold(v.FromAddress, fromAddr) { + for i, pay := range v.Trasnactions { + if strings.EqualFold(pay.ToAddress, toAddr) && pay.Amount == amount { + e.mu.Lock() + e.UnConfirmTxs[txHash] = &message.Transaction{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: txHash, + Symbol: v.Symbol, + Value: amount, + Status: STATUS_PENDING, + } + e.PayMsg[k].Trasnactions[i].Status = STATUS_PENDING + e.mu.Unlock() + } + } + } + } + } +} + func (e *ETHNode) handleUSDTEvent(vLog types.Log, ch chan any) { from := common.HexToAddress(vLog.Topics[1].Hex()) to := common.HexToAddress(vLog.Topics[2].Hex()) @@ -336,283 +739,87 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log, ch chan any) { fmt.Println("ABI 解析错误:", err) return } - // 先验证toAddr是否在监听列表里面 - _, ok := e.ListenAddresses.Load(toAddr) - if !ok { - return - } tx_hash := vLog.TxHash.Hex() - // tx, tx_ok := e.UnConfirmTxs[tx_hash] - // if tx_ok { - // // 【支付/提现】待确认交易中存在该交易Hash(说明是我们主动发起的交易) - // // 直接走确认流程,不发送待确认消息 - // // log.Printf("🔍 检测到已发起的交易: TxHash=%s, Type=%d", tx_hash, tx.TxType) - // e.confirm("USDT", height, tx, ch) - // } else { - // 【充值】待确认交易中不存在该交易hash(说明是外部转账) - // 添加至待确认交易中,并立即发送待确认消息 - // 1,先根据to查询RmqMsgs,再根据存在的rmq_msg中的相关数据,存入待确认交易 - value, rmq_msg_ok := e.RmqMsgs[toAddr] - var tx_type int - if rmq_msg_ok { - for _, v := range value { - _, ok := v.(message.TopupMsg_req) - if ok { - tx_type = 0 + value_float := utils.BigIntUSDTToFloat64(transferEvent.Value) + var status int = 2 + // 分别验证3组消息 + // 充值 + for k, v := range e.TopupMsg { + if k == toAddr { + + e.mu.Lock() + e.UnConfirmTxs[tx_hash] = &message.Transaction{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: tx_hash, + Symbol: v.Symbol, + Value: value_float, + Status: status, } - _, ok1 := v.(message.WithdrawMsg_req) - if ok1 { - tx_type = 1 - } - _, ok2 := v.(message.PayMsg_req) - if ok2 { - tx_type = 2 + e.mu.Unlock() + topup_unconfirm_msg_resp := message.TopupMsg_resp{ + QueueId: v.QueueId, + Address: v.Address, + Status: status, + Symbol: v.Symbol, + Chain: v.Chain, + Amount: value_float, + TxHash: tx_hash, + BlockHeight: height, } + // 异步发送 + go func(msg message.TopupMsg_resp) { + select { + case ch <- msg: + log.Printf("✅ 待确认充值消息已发送") + default: + log.Printf("⚠️ 通道阻塞,待确认消息发送失败") + } + }(topup_unconfirm_msg_resp) } } - e.UnConfirmTxs[tx_hash] = message.Tx_msg{ - TxType: tx_type, - Tx: message.Tx{ - From: fromAddr, - To: toAddr, - Height: height, - TxHash: tx_hash, - Symbol: "USDT", - Value: utils.BigIntUSDTToFloat64(transferEvent.Value), - Status: 2, // 待确认状态 - }, + // 提现 + for k, v := range e.WithdrawMsg { + if k == fromAddr && v.ToAddress == toAddr && v.Amount == value_float { + e.mu.Lock() + e.UnConfirmTxs[tx_hash] = &message.Transaction{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: tx_hash, + Symbol: v.Symbol, + Value: value_float, + Status: status, + } + e.WithdrawMsg[k].Status = STATUS_PENDING + e.mu.Unlock() + } } - // log.Printf("📝 待确认交易新增: TxHash=%s, Height=%d, To=%s, Type=%d", tx_hash, height, toAddr, tx_type) - - // 🔔 【仅充值】立即发送待确认状态的消息(支付/提现不发送待确认消息) - if tx_type == 0 && rmq_msg_ok { - for _, v := range value { - d1, ok := v.(message.TopupMsg_req) - if ok && strings.ToLower(d1.Address) == toAddr { - pendingMsg := message.TopupMsg_resp{ - Address: toAddr, - Status: 2, // 待确认状态 - Chain: d1.Chain, - Symbol: d1.Symbol, - Amount: utils.BigIntUSDTToFloat64(transferEvent.Value), - TxHash: tx_hash, - BlockHeight: height, - } - // log.Printf("📤 发送待确认充值消息: TxHash=%s, Address=%s, Amount=%.2f", - // tx_hash, toAddr, pendingMsg.Amount) - - // 异步发送,避免阻塞事件处理 - go func(msg message.TopupMsg_resp) { - select { - case ch <- msg: - log.Printf("✅ 待确认充值消息已发送") - default: - log.Printf("⚠️ 通道阻塞,待确认消息发送失败") + // 充值 + for k, v := range e.PayMsg { + if k == fromAddr { + for i, pay := range v.Trasnactions { + if pay.ToAddress == toAddr && pay.Amount == value_float { + e.mu.Lock() + e.UnConfirmTxs[tx_hash] = &message.Transaction{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: tx_hash, + Symbol: v.Symbol, + Value: value_float, + Status: status, } - }(pendingMsg) - break - } - } - // } - } -} - -// listenNewBlocks 监听新区块产生,触发交易确认检查 -func (e *ETHNode) listenNewBlocks(symbol string, ch chan any) { - fmt.Println("🔍 开始监听新区块...") - - headers := make(chan *types.Header, 10) - - // 负责重连 - for { - // 订阅新区块头 - sub, err := e.WsClient.SubscribeNewHead(e.Ctx, headers) - if err != nil { - fmt.Println("❌ 订阅新区块失败, 5秒后重试:", err) - time.Sleep(5 * time.Second) - continue - } - fmt.Println("✅ 新区块订阅成功") - - // 处理新区块 - for { - select { - case err := <-sub.Err(): - fmt.Println("⚠️ 新区块订阅异常,准备重连:", err) - sub.Unsubscribe() - time.Sleep(3 * time.Second) - goto reconnect - - case header := <-headers: - // 每当有新区块,检查待确认交易 - currentHeight := header.Number.Uint64() - // log.Printf("🆕 新区块: %d", currentHeight) - - // 检查是否有待确认交易 - e.mu.Lock() - hasPendingTx := len(e.UnConfirmTxs) > 0 - e.mu.Unlock() - - if hasPendingTx { - e.checkAndConfirmTransactions(symbol, currentHeight, ch) + e.PayMsg[k].Trasnactions[i].Status = STATUS_PENDING + e.mu.Unlock() } - case <-e.Ctx.Done(): - fmt.Println("🛑 停止新区块监听") - sub.Unsubscribe() - return } } - reconnect: } } -// checkAndConfirmTransactions 检查并确认达到确认高度的交易 -func (e *ETHNode) checkAndConfirmTransactions(symbol string, currentHeight uint64, ch chan any) { - e.mu.Lock() - needConfirmList := []message.Tx_msg{} - for _, tx := range e.UnConfirmTxs { - // 检查是否达到确认高度 - if currentHeight >= tx.Tx.Height+e.Config.ConfirmHeight { - log.Printf("当前高度=%d, 交易高度=%d", currentHeight, tx.Tx.Height) - // log.Printf("✅ 交易达到确认高度: TxHash=%s, 当前高度=%d, 交易高度=%d, 需确认=%d块", - // txHash, currentHeight, tx.Tx.Height, e.Config.ConfirmHeight) - needConfirmList = append(needConfirmList, tx) - } - } - e.mu.Unlock() - - // 批量触发确认(在锁外执行,避免长时间持锁) - for _, tx := range needConfirmList { - e.confirm(symbol, currentHeight, tx, ch) - } -} - -func (e *ETHNode) confirm(symbol string, height uint64, tx message.Tx_msg, ch chan any) { - switch symbol { - case "USDT": - e.confirm_usdt(tx, height, ch) - } -} - -func (e *ETHNode) confirm_usdt(tx message.Tx_msg, height uint64, ch chan any) { - // 先从 UnConfirmTxs 中读取 - e.mu.Lock() - unConfirmTx, ok := e.UnConfirmTxs[tx.Tx.TxHash] - e.mu.Unlock() - - if !ok { - return - } - - if height < unConfirmTx.Tx.Height { - return - } - - // 超过确认高度,查询交易数据 - txHash := common.HexToHash(tx.Tx.TxHash) - receipt, err := e.RpcClient.TransactionReceipt(e.Ctx, txHash) - var status int - if err != nil { - log.Println("⚠️ 查询交易收据失败 TxHash=", txHash, err) - status = 0 - } else if receipt.Status == types.ReceiptStatusSuccessful { - status = 1 - } else { - status = 0 - } - - tx.Tx.Status = status - - // 通过通道发送,异步写避免阻塞 - go func(tx message.Tx_msg) { - e.mu.Lock() - var result_msg any - var matchIndex = -1 // 记录匹配的索引 - rmq_msg := e.RmqMsgs[tx.Tx.To] - for i, v := range rmq_msg { - // 处理充值 - d1, ok := v.(message.TopupMsg_req) - if ok { - // 统一转小写比较 - if strings.ToLower(d1.Address) == tx.Tx.To { - result_msg = message.TopupMsg_resp{ - Address: tx.Tx.To, - Status: tx.Tx.Status, - Chain: d1.Chain, - Symbol: d1.Symbol, - Amount: tx.Tx.Value, - TxHash: tx.Tx.TxHash, - BlockHeight: tx.Tx.Height, - } - // 充值消息不删除,可能会有多笔充值到同一地址 - break - } - } - // 处理提现 - d2, ok1 := v.(message.WithdrawMsg_req) - if ok1 { - // 统一转小写比较 - if strings.ToLower(d2.FromAddress) == tx.Tx.From && - strings.ToLower(d2.ToAddress) == tx.Tx.To && - d2.Amount == tx.Tx.Value { - result_msg = message.WithdrawMsg_resp{ - QueueId: d2.QueueId, - Status: tx.Tx.Status, - Amount: tx.Tx.Value, - Chain: d2.Chain, - Symbol: d2.Symbol, - TxHash: tx.Tx.TxHash, - FromAddress: tx.Tx.From, - ToAddress: tx.Tx.To, - BlockHeight: tx.Tx.Height, - } - matchIndex = i // 记录索引,稍后删除 - break - } - } - // 处理支付 - d3, ok2 := v.(message.PayMsg_req) - if ok2 { - // 统一转小写比较 - if strings.ToLower(d3.FromAddress) == tx.Tx.From && - strings.ToLower(d3.ToAddress) == tx.Tx.To && - d3.Amount == tx.Tx.Value { - result_msg = message.PayMsg_resp{ - QueueId: d3.QueueId, - Status: tx.Tx.Status, - Amount: tx.Tx.Value, - Chain: d3.Chain, - Symbol: d3.Symbol, - OrderId: d3.OrderId, - TxHash: tx.Tx.TxHash, - FromAddress: tx.Tx.From, - ToAddress: tx.Tx.To, - BlockHeight: tx.Tx.Height, - } - matchIndex = i // 记录索引,稍后删除 - break - } - } - } - // 循环结束后,统一删除匹配的消息(提现和支付需要删除) - if matchIndex >= 0 { - e.RmqMsgs[tx.Tx.To] = utils.Slice_delete(e.RmqMsgs[tx.Tx.To], matchIndex) - } - e.mu.Unlock() - select { - case ch <- result_msg: - default: - fmt.Println("⚠️ confirm通道阻塞,消息丢失:", txHash) - } - }(tx) - - // 删除已确认交易 - e.mu.Lock() - delete(e.UnConfirmTxs, tx.Tx.TxHash) - e.mu.Unlock() -} - func (e *ETHNode) decodePrivatekey(address string) string { // 统一转换为小写 address = strings.ToLower(address) @@ -630,196 +837,306 @@ func (e *ETHNode) decodePrivatekey(address string) string { return privateKey } -func (e *ETHNode) usdt_transfer(msg any) error { - var user_from, final_from, to string - var amount float64 - // var tx_type int - // now_height, err := e.getBlockHeight() - // if err != nil { - // return fmt.Errorf("get lastest height error: %v", err) - // } - // --------------------------------------------------------------------------------------------- - // 断言,确定本次转账是哪个类型 - // 支付操作 - v, ok := msg.(message.PayMsg_req) - if ok { - e.AddAddress(v.ToAddress, v) // 存入该笔msg(AddAddress内部会转小写) - // 统一转换为小写 - user_from, final_from, to, amount = strings.ToLower(v.FromAddress), strings.ToLower(v.FromAddress), strings.ToLower(v.ToAddress), v.Amount - // tx_type = 2 - } - // 提现操作 - k, ok1 := msg.(message.WithdrawMsg_req) - if ok1 { - e.AddAddress(k.ToAddress, k) // 存入该笔msg(AddAddress内部会转小写) - // 统一转换为小写 - user_from, final_from, to, amount = strings.ToLower(k.FromAddress), strings.ToLower(k.FromAddress), strings.ToLower(k.ToAddress), k.Amount - // tx_type = 1 - } - // --------------------------------------------------------------------------------------------- - // 1,校验钱包余额 - balance, err := e.getUSDTBalance(user_from) - log.Printf("检测Transfer钱包=%s,余额=%f", user_from, balance) - if err != nil { - return fmt.Errorf("failed to get balance: %w", err) - } - // 2,钱包余额不足,调用归集钱包转账 - if balance < amount { - final_from = "归集钱包" - } - // 3,通过from地址前往数据库查找出对应加密后的私钥,并解密真实的私钥 - originalKey := e.decodePrivatekey(final_from) - if originalKey == "" { - return fmt.Errorf("failed to query privatekey") - } - privateKey, err := crypto.HexToECDSA(originalKey) - if err != nil { - return fmt.Errorf("failed to parse private key: %w", err) - } - // 4, 获得nonce - nonce, err := e.RpcClient.PendingNonceAt(e.Ctx, common.HexToAddress(final_from)) - if err != nil { - return fmt.Errorf("failed to get nonce: %w", err) - } - // 5, 构造交易(ERC20 transfer 调用) - amountBigInt := utils.Float64ToBigIntUSDT(amount) - data, err := e.USDT.ABI.Pack("transfer", common.HexToAddress(to), amountBigInt) // 打包 transfer(address,uint256) 方法调用 - if err != nil { - return fmt.Errorf("failed to pack transfer data: %w", err) - } - gasLimit, err := e.getGasLimit() // 获得gasLimit - if err != nil { - return fmt.Errorf("get gas limit error:%v", err) +// 验证余额 +func (e *ETHNode) verifyBalance(symbol, from string, amount float64) (bool, error) { + var amount_b *big.Int + e.mu.Lock() + maxGas := e.RealData.GasFeeCap + if maxGas == nil { + return false, fmt.Errorf("chain data not initialized, maxGas = nil") } + e.mu.Unlock() - // 获取EIP-1559 gas费用参数 - maxFeePerGas, maxPriorityFeePerGas, err := e.getEIP1559GasFees() - if err != nil { - log.Printf("⚠️ 获取EIP-1559费用失败,回退到传统gas price: %v", err) - // 回退到传统gas price - gasPrice, err := e.getSuggestGasPrice() + switch symbol { + case "ETH": + amount_b = utils.Float64ToBigIntETH(amount) + totalAmount := new(big.Int).Add(amount_b, maxGas) + ethBalance, err := e.getETHBalance(from) if err != nil { - return fmt.Errorf("get suggest-gasprice error:%v", err) + return false, fmt.Errorf("get (%s) eth-balance error: %v", from, err) } - - eth_balance, err := e.getETHBlance(final_from) + if ethBalance.Cmp(totalAmount) == -1 { + return false, nil // 余额不足 + } + return true, nil + case "USDT": + amount_b = utils.Float64ToBigIntUSDT(amount) + usdtBalance, err := e.getUSDTBalance(from) if err != nil { - return fmt.Errorf("%w", err) + return false, fmt.Errorf("get (%s) usdt-balance error: %v", from, err) } - - gasLimit_b := new(big.Int).SetUint64(gasLimit) - gas := new(big.Int).Mul(gasLimit_b, gasPrice) - - // 计算gas费用(以ETH为单位) - gasInETH := new(big.Float).SetInt(gas) - gasInETH.Quo(gasInETH, new(big.Float).SetInt64(1000000000000000000)) - - log.Printf("💰 传统Gas费用预估: Limit=%d, Price=%v Gwei, 总费用=%.6f ETH", - gasLimit, - new(big.Int).Div(gasPrice, big.NewInt(1000000000)), - gasInETH) - - // 判断钱包eth是否支持本次交易gas费用 - if eth_balance.Cmp(gas) == -1 { - ethBalanceInETH := new(big.Float).SetInt(eth_balance) - ethBalanceInETH.Quo(ethBalanceInETH, new(big.Float).SetInt64(1000000000000000000)) - return fmt.Errorf("❌ 地址 %s ETH余额不足: %.6f ETH < %.6f ETH (gas费用)", - final_from, ethBalanceInETH, gasInETH) + if usdtBalance.Cmp(amount_b) == -1 { + return false, nil // 余额不足 } - - // 构造传统交易 - tx := types.NewTransaction( - nonce, - e.USDT.Address, - big.NewInt(0), - gasLimit, - gasPrice, - data, - ) - - // 签名并发送传统交易 - signedTx, err := types.SignTx(tx, types.NewEIP155Signer(e.NetId), privateKey) - if err != nil { - return fmt.Errorf("failed to sign transaction: %w", err) - } - - txHash := signedTx.Hash().Hex() - err = e.RpcClient.SendTransaction(e.Ctx, signedTx) - if err != nil { - return fmt.Errorf("failed to send transaction: %w", err) - } - - log.Printf("✅ 传统交易已提交至mempool:%s,金额:%.2f USDT, 手续费:%.6f ETH", txHash, amount, gasInETH) - return nil + return true, nil + default: + return false, fmt.Errorf("ETH NetWork error symbol: %s", symbol) } +} - // 使用EIP-1559交易 - eth_balance, err := e.getETHBlance(final_from) +// 构建交易 +func (e *ETHNode) contractTx(symbol, from, to string, amount float64) (*types.Transaction, error) { + nonce, err := e.getTransactionNonce(from) if err != nil { - return fmt.Errorf("%w", err) + return nil, fmt.Errorf("failed to get nonce: %v", err) } + return e.buildContractTxWithNonce(symbol, from, to, amount, nonce) +} - // 计算最大可能的gas费用 - maxGasCost := new(big.Int).Mul(new(big.Int).SetUint64(gasLimit), maxFeePerGas) - - // 计算gas费用(以ETH为单位) - maxGasCostInETH := new(big.Float).SetInt(maxGasCost) - maxGasCostInETH.Quo(maxGasCostInETH, new(big.Float).SetInt64(1000000000000000000)) - - log.Printf("💰 EIP-1559 Gas费用预估: Limit=%d, MaxFee=%v Gwei, MaxPriorityFee=%v Gwei, 最大费用=%.6f ETH", - gasLimit, - new(big.Int).Div(maxFeePerGas, big.NewInt(1000000000)), - new(big.Int).Div(maxPriorityFeePerGas, big.NewInt(1000000000)), - maxGasCostInETH) - - // 判断钱包eth是否支持本次交易gas费用 - if eth_balance.Cmp(maxGasCost) == -1 { - ethBalanceInETH := new(big.Float).SetInt(eth_balance) - ethBalanceInETH.Quo(ethBalanceInETH, new(big.Float).SetInt64(1000000000000000000)) - return fmt.Errorf("❌ 地址 %s ETH余额不足: %.6f ETH < %.6f ETH (最大gas费用)", - final_from, ethBalanceInETH, maxGasCostInETH) +// 构建交易(指定 nonce) +func (e *ETHNode) buildContractTxWithNonce(symbol, from, to string, amount float64, nonce uint64) (*types.Transaction, error) { + e.mu.Lock() + maxFeePerGas, maxPriorityFeePerGas, gasLimit := e.RealData.GasFeeCap, e.RealData.GasTipCap, e.RealData.GasLimit + if maxFeePerGas == nil || maxPriorityFeePerGas == nil || gasLimit == 0 { + e.mu.Unlock() + return nil, fmt.Errorf("chain data not initialized!") } + netID := e.NetID + e.mu.Unlock() - // 构造EIP-1559交易 - tx := types.NewTx(&types.DynamicFeeTx{ - ChainID: e.NetId, + addr := common.HexToAddress(to) + eip1559Tx := &types.DynamicFeeTx{ + ChainID: netID, Nonce: nonce, GasTipCap: maxPriorityFeePerGas, GasFeeCap: maxFeePerGas, Gas: gasLimit, - To: &e.USDT.Address, - Value: big.NewInt(0), - Data: data, - }) - // 6, 签名EIP-1559交易并获得txHash - signedTx, err := types.SignTx(tx, types.NewLondonSigner(e.NetId), privateKey) - if err != nil { - return fmt.Errorf("failed to sign EIP-1559 transaction: %w", err) + To: &addr, + Data: []byte{}, } - txHash := signedTx.Hash().Hex() - - // 7, 发送EIP-1559交易 - err = e.RpcClient.SendTransaction(e.Ctx, signedTx) - if err != nil { - return fmt.Errorf("failed to send EIP-1559 transaction: %w", err) + switch symbol { + case "ETH": + eip1559Tx.Value = utils.Float64ToBigIntETH(amount) + case "USDT": + eip1559Tx.Value = big.NewInt(0) + data, err := e.USDT.ABI.Pack("transfer", common.HexToAddress(to), utils.Float64ToBigIntUSDT(amount)) + if err != nil { + return nil, fmt.Errorf("failed to pack transfer data: %v", err) + } + eip1559Tx.Data = data + default: + return nil, fmt.Errorf("ETH NetWork error symbol: %s", symbol) } - log.Printf("✅ EIP-1559交易已提交至mempool:%s,金额:%.2f USDT, 最大手续费:%.6f ETH", txHash, amount, maxGasCostInETH) - // // 8, 构造交易消息 - // tx_msg := message.Tx_msg{ - // TxType: tx_type, - // Tx: message.Tx{ - // From: final_from, - // To: to, - // Height: now_height, - // TxHash: txHash, - // Symbol: "USDT", - // Value: amount, - // Status: 2, - // }, + return types.NewTx(eip1559Tx), nil +} + +// 签名交易 +func (e *ETHNode) signTx(tx *types.Transaction, from string) (*types.Transaction, error) { + originalKey := e.decodePrivatekey(from) + if originalKey == "" { + return nil, fmt.Errorf("failed to query private key for address: %s", from) + } + + privateKey, err := crypto.HexToECDSA(originalKey) + if err != nil { + return nil, fmt.Errorf("failed to parse private key: %v", err) + } + + signer := types.LatestSignerForChainID(e.NetID) + return types.SignTx(tx, signer, privateKey) +} + +// 发送交易并存入待确认交易池 +func (e *ETHNode) sendTransaction(tx *types.Transaction, txHash, symbol, from, to string, amount float64) error { + // 发送交易 + // err := e.RpcClient.SendTransaction(e.Ctx, tx) + // if err != nil { + // return fmt.Errorf("failed to send transaction: %v", err) // } - // // 9, 将构造的交易消息存入待确认交易中 - // e.UnConfirmTxs[txHash] = tx_msg + + // 获取当前区块高度 + e.mu.Lock() + height := e.RealData.Heihgt + e.mu.Unlock() + + // 将交易存入待交易池 + e.mu.Lock() + e.UnConfirmTxs[txHash] = &message.Transaction{ + Symbol: symbol, + From: from, + To: to, + Height: height, + TxHash: txHash, + Value: amount, + Status: STATUS_PENDING, + } + e.mu.Unlock() return nil } + +// 确认信息并返回 +func (e *ETHNode) confirm(ch chan any) { + e.mu.Lock() + height := e.RealData.Heihgt + e.mu.Unlock() + var needSendMsg []any + var toDeleteTxs []string // 用于记录需要从 UnConfirmTxs 中删除的交易 + + for k, v := range e.UnConfirmTxs { + if v.Height+e.ConfirmHeight >= height { + tx_result, err := e.checkTransaction(k) + if err != nil { + log.Printf("❌ check tx(%s) error: %v", k, err) + continue + } + + e.mu.Lock() + if tx_result { + // 交易成功 + // 判断是否在充值消息中 + for toAddr, topup_msg := range e.TopupMsg { + if strings.EqualFold(toAddr, v.To) { + msg := message.TopupMsg_resp{ + QueueId: topup_msg.QueueId, + Address: v.To, + Status: STATUS_SUCCESS, + Chain: topup_msg.Chain, + Symbol: topup_msg.Symbol, + Amount: v.Value, + TxHash: v.TxHash, + BlockHeight: height, + } + needSendMsg = append(needSendMsg, msg) + break // 充值成功后不删除,可能还有后续充值 + } + } + + // 判断是否在提现消息中 + for key, withdraw_msg := range e.WithdrawMsg { + if v.TxHash == withdraw_msg.TxHash { + msg := message.WithdrawMsg_resp{ + QueueId: withdraw_msg.QueueId, + Chain: withdraw_msg.Chain, + Symbol: withdraw_msg.Symbol, + Status: STATUS_SUCCESS, + Amount: v.Value, + TxHash: v.TxHash, + FromAddress: v.From, + ToAddress: v.To, + BlockHeight: v.Height, + } + needSendMsg = append(needSendMsg, msg) + // 删除提现消息 + e.RemoveAddress(withdraw_msg) + delete(e.WithdrawMsg, key) + break + } + } + + // 判断是否在支付消息中 + for _, pay_msg := range e.PayMsg { + for payKey, pay := range pay_msg.Trasnactions { + if v.TxHash == pay.TxHash { + pay_msg.Trasnactions[payKey].Status = STATUS_SUCCESS + } + } + } + } else { + // 交易失败 + // 判断是否在充值消息中 + for toAddr, topup_msg := range e.TopupMsg { + if strings.EqualFold(toAddr, v.To) { + msg := message.TopupMsg_resp{ + QueueId: topup_msg.QueueId, + Address: v.To, + Status: STATUS_FAILED, + Chain: topup_msg.Chain, + Symbol: topup_msg.Symbol, + Amount: v.Value, + TxHash: v.TxHash, + BlockHeight: height, + } + needSendMsg = append(needSendMsg, msg) + // 充值失败,删除该监听地址 + delete(e.TopupMsg, toAddr) + break + } + } + + // 判断是否在提现消息中 + for key, withdraw_msg := range e.WithdrawMsg { + if v.TxHash == withdraw_msg.TxHash { + msg := message.WithdrawMsg_resp{ + QueueId: withdraw_msg.QueueId, + Chain: withdraw_msg.Chain, + Symbol: withdraw_msg.Symbol, + Status: STATUS_FAILED, + Amount: v.Value, + TxHash: v.TxHash, + FromAddress: v.From, + ToAddress: v.To, + BlockHeight: v.Height, + } + needSendMsg = append(needSendMsg, msg) + // 删除提现消息 + e.RemoveAddress(withdraw_msg) + delete(e.WithdrawMsg, key) + break + } + } + + // 判断是否在支付消息中 + for _, pay_msg := range e.PayMsg { + for payKey, pay := range pay_msg.Trasnactions { + if v.TxHash == pay.TxHash { + pay_msg.Trasnactions[payKey].Status = STATUS_FAILED + } + } + } + } + e.mu.Unlock() + + // 标记为删除 + toDeleteTxs = append(toDeleteTxs, k) + } + } + + // 删除已确认的交易 + if len(toDeleteTxs) > 0 { + e.mu.Lock() + for _, txHash := range toDeleteTxs { + delete(e.UnConfirmTxs, txHash) + } + e.mu.Unlock() + } + + // 检查支付消息是否需要发送完整响应 + e.mu.Lock() + var payMsgsToDelete []string + for key, pay_msg := range e.PayMsg { + allConfirmed := true + for _, tx := range pay_msg.Trasnactions { + if tx.Status == STATUS_PENDING { + allConfirmed = false + break + } + } + if allConfirmed { + // 所有交易都已确认,发送完整响应 + needSendMsg = append(needSendMsg, pay_msg) + payMsgsToDelete = append(payMsgsToDelete, key) + } + } + // 删除已全部确认的支付消息 + for _, key := range payMsgsToDelete { + e.RemoveAddress(e.PayMsg[key]) + delete(e.PayMsg, key) + } + e.mu.Unlock() + + // 异步发送所有响应消息 + if len(needSendMsg) != 0 { + for _, data := range needSendMsg { + go func(msg any) { + select { + case ch <- msg: + log.Printf("✅ confirm message sent: %+v", msg) + default: + log.Printf("⚠️ 通道阻塞,待确认消息发送失败") + } + }(data) + } + } +} diff --git a/internal/logger/transaction_logger.go b/internal/logger/transaction_logger.go index 885d6fc..0b12217 100644 --- a/internal/logger/transaction_logger.go +++ b/internal/logger/transaction_logger.go @@ -2,8 +2,10 @@ package logger import ( "compress/gzip" + "encoding/json" "fmt" "io" + message "m2pool-payment/internal/msg" "os" "path/filepath" "sync" @@ -231,21 +233,25 @@ func LogWithdraw(toAddress string, status string, amount float64, fromAddress st } // LogPay 记录支付消息 -func LogPay(toAddress string, status string, amount float64, fromAddress string, txHash string, blockHeight uint64, orderId string, queueId string) { +func LogPay(status string, fromAddress string, queueId string, transactions map[string]*message.PayData_resp) { if txLogger == nil { return } // 使用 toAddress 作为文件名 - lf, err := txLogger.getOrCreateLogFile(toAddress) + lf, err := txLogger.getOrCreateLogFile(fromAddress) if err != nil { fmt.Printf("⚠️ 获取日志文件失败: %v\n", err) return } - + t, err := json.Marshal(transactions) + if err != nil { + fmt.Println("Error marshalling to JSON:", err) + return + } timestamp := time.Now().Format("2006-01-02 15:04:05") - content := fmt.Sprintf("%s [pay]-[%s] | 金额: %.6f | FromAddress: %s | ToAddress: %s | 交易哈希: %s | 区块高度: %d | OrderId: %s | QueueId: %s", - timestamp, status, amount, fromAddress, toAddress, txHash, blockHeight, orderId, queueId) + content := fmt.Sprintf("%s [pay]-[%s] | FromAddress: %s | QueueId: %s | Transactions: %v", + timestamp, status, fromAddress, queueId, string(t)) if err := lf.write(content); err != nil { fmt.Printf("⚠️ 写入日志失败: %v\n", err) diff --git a/internal/msg/msg.go b/internal/msg/msg.go index c70a373..cbee20d 100644 --- a/internal/msg/msg.go +++ b/internal/msg/msg.go @@ -60,17 +60,20 @@ type DbConfig struct { // =============================== type0 =============================== // 接收的充值消息 type TopupMsg_req struct { + QueueId string `json:"queue_id"` Chain string `json:"chain"` // 链名称 Symbol string `json:"symbol"` // 币种 Address string `json:"address"` Timestamp uint64 `json:"timestamp"` Sign string `json:"sign"` + Status int `json:"status,omitempty"` } // 返回充值结果消息 type TopupMsg_resp struct { + QueueId string `json:"queue_id"` Address string `json:"address"` - Status int `json:"status"` + Status int `json:"status"` // 0失败,1成功,2待定,3sign校验失败 Chain string `json:"chain"` // 链名称 Symbol string `json:"symbol"` // 币种 Amount float64 `json:"amount"` @@ -87,8 +90,10 @@ type WithdrawMsg_req struct { Amount float64 `json:"amount"` Chain string `json:"chain"` // 链名称 Symbol string `json:"symbol"` // 币种 + TxHash string `json:"tx_hash,omitempty"` Timestamp uint64 `json:"timestamp"` Sign string `json:"sign"` + Status int `json:"status,omitempty"` } // 返回提现结果消息 @@ -96,7 +101,7 @@ type WithdrawMsg_resp struct { QueueId string `json:"queue_id"` Chain string `json:"chain"` // 链名称 Symbol string `json:"symbol"` // 币种 - Status int `json:"status"` + Status int `json:"status"` // 0失败,1成功,3sign校验失败 Amount float64 `json:"amount"` TxHash string `json:"tx_hash"` FromAddress string `json:"from_address"` // 来源地址 @@ -105,36 +110,74 @@ type WithdrawMsg_resp struct { } // =============================== type2 =============================== -// 接收到的支付消息 + type PayMsg_req struct { - QueueId string `json:"queue_id"` - FromAddress string `json:"from_address"` // 我们提供的地址 - ToAddress string `json:"to_address"` // 卖家地址 - Amount float64 `json:"amount"` - Chain string `json:"chain"` // 链名称 - Symbol string `json:"symbol"` // 币种 - OrderId string `json:"order_id"` // 订单号 - Timestamp uint64 `json:"timestamp"` - Sign string `json:"sign"` + QueueId string `json:"queue_id"` + FromAddress string `json:"from_address"` + Chain string `json:"chain"` + Symbol string `json:"symbol"` + TotalAmount float64 `json:"total_amount"` + Timestamp uint64 `json:"timestamp"` + Sign string `json:"sign"` + Trasnactions map[string]*PayData_req `json:"transactions"` // {"to_address": PayData_req{}, ...} +} +type PayData_req struct { + OrderId string `json:"order_id"` + ToAddress string `json:"to_address"` + TxHash string `json:"tx_hash,omitempty"` + Amount float64 `json:"amount"` + Status int `json:"status,omitempty"` } -// 返回支付结果消息 type PayMsg_resp struct { - QueueId string `json:"queue_id"` - Status int `json:"status"` - Amount float64 `json:"amount"` - Chain string `json:"chain"` // 链名称 - Symbol string `json:"symbol"` // 币种 - OrderId string `json:"order_id"` // 订单号 - TxHash string `json:"tx_hash"` - FromAddress string `json:"from_address"` // 来源地址 - ToAddress string `json:"to_address"` // 目标地址 - BlockHeight uint64 `json:"block_height"` // 区块高度 + QueueId string `json:"queue_id"` + FromAddress string `json:"from_address"` + PayStatus int `json:"pay_status"` // 1至少有一笔转账成功,3sign校验失败,4钱包余额不足 + Transactions map[string]*PayData_resp `json:"transactions"` // {"to_address": PayData_resp{}, ...} } +type PayData_resp struct { + OrderId string `json:"order_id"` + FromAddress string `json:"from_address"` + ToAddress string `json:"to_address"` + Chain string `json:"chain"` + Symbol string `json:"symbol"` + Amount float64 `json:"amount"` + TxHash string `json:"tx_hash,omitempty"` + BlockHeight uint64 `json:"block_height,omitempty"` + Status int `json:"status"` // 0失败,1成功 +} + +// 接收到的支付消息 +// type PayMsg_req struct { +// QueueId string `json:"queue_id"` +// FromAddress string `json:"from_address"` // 我们提供的地址 +// ToAddress string `json:"to_address"` // 卖家地址 +// Amount float64 `json:"amount"` +// Chain string `json:"chain"` // 链名称 +// Symbol string `json:"symbol"` // 币种 +// OrderId string `json:"order_id"` // 订单号 +// Timestamp uint64 `json:"timestamp"` +// Sign string `json:"sign"` +// Transactions map[string]Transaction `json:"tx"` +// } +// 返回支付结果消息 +// type PayMsg_resp struct { +// QueueId string `json:"queue_id"` +// Status int `json:"status"` +// Amount float64 `json:"amount"` +// Chain string `json:"chain"` // 链名称 +// Symbol string `json:"symbol"` // 币种 +// OrderId string `json:"order_id"` // 订单号 +// TxHash string `json:"tx_hash"` +// FromAddress string `json:"from_address"` // 来源地址 +// ToAddress string `json:"to_address"` // 目标地址 +// BlockHeight uint64 `json:"block_height"` // 区块高度 +// } // =============================== type3 =============================== // 接收到的删除监听地址消息 type RemoveListenMsg_req struct { + QueueId string `json:"queue_id"` MsgType int `json:"msg_type"` Chain string `json:"chain"` Symbol string `json:"symbol"` @@ -145,6 +188,7 @@ type RemoveListenMsg_req struct { // 返回收到的删除监听地址消息 type RemoveListenMsg_resp struct { + QueueId string `json:"queue_id"` MsgType int `json:"msg_type"` Chain string `json:"chain"` Symbol string `json:"symbol"` @@ -167,3 +211,13 @@ type Tx struct { Value float64 `json:"value"` // 数量,单位是币 Status int `json:"status"` // 交易状态,1成功,0失败, 2待确认 } + +type Transaction struct { + From string `json:"from"` // 充值/提现/支付的来源地址 + To string `json:"to"` // 充值/提现/支付的目标地址 + Height uint64 `json:"height"` // 区块高度 + TxHash string `json:"tx_hash"` // 交易哈希 + Symbol string `json:"symbol"` // 币种 + Value float64 `json:"value"` // 数量,单位是币 + Status int `json:"status"` // 交易状态,1成功,0失败, 2待确认 +} diff --git a/internal/queue/rabbitmq.go b/internal/queue/rabbitmq.go index 4614bbc..bcbd6a9 100644 --- a/internal/queue/rabbitmq.go +++ b/internal/queue/rabbitmq.go @@ -22,9 +22,10 @@ type RabbitMQServer struct { cancel context.CancelFunc // 消息处理回调函数 - OnTopupMsg func(message.TopupMsg_req) // 充值请求回调 - OnWithdrawMsg func(message.WithdrawMsg_req) // 提现请求回调 - OnPayMsg func(message.PayMsg_req) // 支付请求回调 + OnTopupMsg func(message.TopupMsg_req) // 充值请求回调 + OnWithdrawMsg func(message.WithdrawMsg_req) // 提现请求回调 + OnPayMsg func(message.PayMsg_req) // 支付请求回调 + OnRemoveMsg func(message.RemoveListenMsg_req) // 删除充值监听回调 } // NewRabbitMQServer 创建 RabbitMQ 服务 @@ -68,9 +69,11 @@ func (r *RabbitMQServer) setupQueuesAndExchanges() error { r.config.PayConfig, r.config.TopUpConfig, r.config.WithdrawConfig, + r.config.RemoveConfig, r.config.PayRespConfig, r.config.TopUpRespConfig, r.config.WithdrawRespConfig, + r.config.RemoveRespConfig, } for _, cfg := range configs { @@ -131,6 +134,8 @@ func (r *RabbitMQServer) Start() error { go r.consumeWithdraw() // 启动支付消息监听 go r.consumePay() + // 启动删除充值监听 + go r.consumeRemove() // log.Println("🚀 RabbitMQ 服务启动成功,开始监听消息...") return nil @@ -188,8 +193,8 @@ func (r *RabbitMQServer) consumePay() { if err := json.Unmarshal(body, &msg); err != nil { return fmt.Errorf("failed to parse pay message: %w", err) } - log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, OrderId=%s, From=%s, To=%s, Amount=%.2f %s", - msg.QueueId, msg.OrderId, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Symbol) + log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, From=%s, Chain=%s, Symbol=%s, TxCount=%d", + msg.QueueId, msg.FromAddress, msg.Chain, msg.Symbol, len(msg.Trasnactions)) if r.OnPayMsg != nil { r.OnPayMsg(msg) @@ -199,6 +204,26 @@ func (r *RabbitMQServer) consumePay() { ) } +// consumeRemove 消费删除充值监听消息 +func (r *RabbitMQServer) consumeRemove() { + r.consumeQueue( + r.config.RemoveConfig.QueueName, + "remove", + func(body []byte) error { + var msg message.RemoveListenMsg_req + if err := json.Unmarshal(body, &msg); err != nil { + return fmt.Errorf("failed to parse remove message: %w", err) + } + log.Printf("📥 [RMQ] 收到删除充值监听: Chain=%s, Symbol=%s, Address=%s", msg.Chain, msg.Symbol, msg.Address) + + if r.OnRemoveMsg != nil { + r.OnRemoveMsg(msg) + } + return nil + }, + ) +} + // consumeQueue 通用队列消费方法 func (r *RabbitMQServer) consumeQueue(queueName, msgType string, handler func([]byte) error) { for { @@ -268,8 +293,16 @@ func (r *RabbitMQServer) PublishPayResp(resp message.PayMsg_resp) error { return r.publishMessage( r.config.PayRespConfig, resp, - fmt.Sprintf("支付响应: QueueId=%s, OrderId=%s, Status=%d, TxHash=%s", - resp.QueueId, resp.OrderId, resp.Status, resp.TxHash), + "支付响应", + ) +} + +// PublishRemoveResp 发布删除充值监听响应 +func (r *RabbitMQServer) PublishRemoveResp(resp message.RemoveListenMsg_resp) error { + return r.publishMessage( + r.config.RemoveRespConfig, + resp, + fmt.Sprintf("删除充值监听响应: Address=%s, Status=%d", resp.Address, resp.Status), ) } diff --git a/internal/server.go b/internal/server.go index 8375464..63e5bb9 100644 --- a/internal/server.go +++ b/internal/server.go @@ -6,7 +6,7 @@ import ( "fmt" "log" "m2pool-payment/internal/blockchain" - "m2pool-payment/internal/blockchain/eth" + eth "m2pool-payment/internal/blockchain/eth" "m2pool-payment/internal/crypto" "m2pool-payment/internal/db" "m2pool-payment/internal/logger" @@ -19,7 +19,7 @@ import ( "time" ) -const MSG_KEY string = "9f3c7a12" +// const MSG_KEY string = "9f3c7a12" // 状态码常量 const ( @@ -41,7 +41,7 @@ var s_ctx ServerCtx // verifyMessage 验证消息签名 func verifyMessage(timestamp uint64, sign string) bool { - hash_byte := crypto.Sha256Hash(fmt.Sprintf("%x", timestamp) + MSG_KEY) + hash_byte := crypto.Sha256Hash(fmt.Sprintf("%x", timestamp) + s_ctx.msgKey) hash := hex.EncodeToString(hash_byte) return hash == sign } @@ -109,7 +109,7 @@ func loadTopupReqMsg() error { if err := rows.Scan(&topupReq_msg.Chain, &topupReq_msg.Symbol, &topupReq_msg.Timestamp, &topupReq_msg.Address); err != nil { return err } - s_ctx.blockChainServer.AddAddress(topupReq_msg.Chain, topupReq_msg.Address, topupReq_msg) + s_ctx.blockChainServer.AddAddress(topupReq_msg.Chain, topupReq_msg) } if !hasData { @@ -141,7 +141,7 @@ func loadWithdrawReqMsg() error { if err := rows.Scan(&withdrawReq_msg.QueueId, &withdrawReq_msg.Chain, &withdrawReq_msg.Symbol, &withdrawReq_msg.Timestamp, &withdrawReq_msg.FromAddress, &withdrawReq_msg.ToAddress, &withdrawReq_msg.Amount); err != nil { return err } - s_ctx.blockChainServer.AddAddress(withdrawReq_msg.Chain, withdrawReq_msg.ToAddress, withdrawReq_msg) + s_ctx.blockChainServer.AddAddress(withdrawReq_msg.Chain, withdrawReq_msg) } if !hasData { @@ -157,7 +157,7 @@ func loadWithdrawReqMsg() error { } func loadPayReqMsg() error { - sql := `SELECT queueId, chain, symbol, timestamp, from_addr, to_addr, amount, orderId FROM msg_pay_req;` + sql := `SELECT queueId, chain, symbol, timestamp, from_addr, total_amount FROM msg_pay_req;` rows, err := s_ctx.sqlitedb.DB.Query(sql) if err != nil { return fmt.Errorf("query history pay-msg error: %w", err) @@ -168,10 +168,10 @@ func loadPayReqMsg() error { hasData := false for rows.Next() { hasData = true - if err := rows.Scan(&payReq_msg.QueueId, &payReq_msg.Chain, &payReq_msg.Symbol, &payReq_msg.Timestamp, &payReq_msg.FromAddress, &payReq_msg.ToAddress, &payReq_msg.Amount, &payReq_msg.OrderId); err != nil { + if err := rows.Scan(&payReq_msg.QueueId, &payReq_msg.Chain, &payReq_msg.Symbol, &payReq_msg.Timestamp, &payReq_msg.FromAddress, &payReq_msg.TotalAmount); err != nil { return err } - s_ctx.blockChainServer.AddAddress(payReq_msg.Chain, payReq_msg.ToAddress, payReq_msg) + s_ctx.blockChainServer.AddAddress(payReq_msg.Chain, payReq_msg) } if !hasData { @@ -235,7 +235,7 @@ func handleTopupMsg() { // 添加监听地址 // go func() { - err := s_ctx.blockChainServer.AddAddress(msg.Chain, msg.Address, msg) + err := s_ctx.blockChainServer.AddAddress(msg.Chain, msg) if err != nil { log.Printf("❌ 添加监听地址失败: %v", err) // 发送失败响应 @@ -285,7 +285,7 @@ func handleWithdrawMsg() { } // 执行转账 - err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg) + err := s_ctx.blockChainServer.Transfer(msg.Chain, msg) if err != nil { log.Printf("❌ 提现转账失败: %v", err) // 发送失败响应 @@ -300,7 +300,7 @@ func handleWithdrawMsg() { return // 转账失败时直接返回,不进入链上确认流程 } // go func() { - err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg.ToAddress, msg) + err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg) if err != nil { log.Printf("❌ 添加监听地址失败: %v", err) // 发送失败响应 @@ -328,18 +328,14 @@ func handleWithdrawMsg() { func handlePayMsg() { s_ctx.rmqServer.OnPayMsg = func(msg message.PayMsg_req) { msg.FromAddress = strings.ToLower(msg.FromAddress) - msg.ToAddress = strings.ToLower(msg.ToAddress) + // msg.ToAddress = strings.ToLower(msg.ToAddress) // 验证签名 if !verifyMessage(msg.Timestamp, msg.Sign) { err := s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ - QueueId: msg.QueueId, - Status: STATUS_VERIFY_FAILED, - Amount: msg.Amount, - Chain: msg.Chain, - Symbol: msg.Symbol, - OrderId: msg.OrderId, - TxHash: "", + QueueId: msg.QueueId, + FromAddress: msg.FromAddress, + PayStatus: STATUS_VERIFY_FAILED, }) if err != nil { log.Printf("❌ 发布支付失败响应失败: %v", err) @@ -348,41 +344,33 @@ func handlePayMsg() { } // 执行转账 - err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg) + err := s_ctx.blockChainServer.Transfer(msg.Chain, msg) if err != nil { log.Printf("❌ 支付转账失败: %v", err) // 发送失败响应 s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ - QueueId: msg.QueueId, - Status: STATUS_FAILED, - Amount: msg.Amount, - Chain: msg.Chain, - Symbol: msg.Symbol, - OrderId: msg.OrderId, - TxHash: "", + QueueId: msg.QueueId, + FromAddress: msg.FromAddress, + PayStatus: STATUS_FAILED, }) return // 转账失败时直接返回,不进入链上确认流程 } // go func() { - err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg.ToAddress, msg) + err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg) if err != nil { log.Printf("❌ 添加监听地址失败: %v", err) // 发送失败响应 s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ - QueueId: msg.QueueId, - Status: STATUS_FAILED, - Amount: msg.Amount, - Chain: msg.Chain, - Symbol: msg.Symbol, - OrderId: msg.OrderId, - TxHash: "", + QueueId: msg.QueueId, + FromAddress: msg.FromAddress, + PayStatus: STATUS_FAILED, }) return } // }() // 将新增数据写入sqlite - insert_sql := `INSERT OR REPLACE INTO msg_pay_req (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?)` - data := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Timestamp, msg.FromAddress, msg.ToAddress, msg.Amount, msg.OrderId} + insert_sql := `INSERT OR REPLACE INTO msg_pay_req (queueId, chain, symbol, timestamp, from_addr, total_amount) VALUES (?, ?, ?, ?, ?, ?)` + data := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Timestamp, msg.FromAddress, msg.TotalAmount} err = s_ctx.sqlitedb.Insert(insert_sql, data) if err != nil { log.Printf("❌ 插入 pay_req 失败: %v, data: %+v", err, data) @@ -481,37 +469,37 @@ func handleChainEvent(chainEventCh chan any) { case message.PayMsg_resp: // 支付确认 - log.Printf("✅ [链上] 支付确认: QueueId=%s, OrderId=%s, Amount=%.2f, TxHash=%s, Status=%d", - msg.QueueId, msg.OrderId, msg.Amount, msg.TxHash, msg.Status) + log.Printf("✅ [链上] 支付确认: QueueId=%s, FromAddress=%s, Status=%d", + msg.QueueId, msg.FromAddress, msg.PayStatus) // 记录交易日志 - logger.LogPay(msg.ToAddress, "确认", msg.Amount, msg.FromAddress, msg.TxHash, msg.BlockHeight, msg.OrderId, msg.QueueId) + logger.LogPay("全部交易确认", msg.FromAddress, msg.QueueId, msg.Transactions) err := s_ctx.rmqServer.PublishPayResp(msg) if err != nil { log.Printf("❌ 发送支付响应失败: %v", err) return } - go func() { - // 插入响应数据 - sql := `INSERT INTO msg_pay_resp (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, height, txHash, status, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` - data := []any{msg.QueueId, msg.Chain, msg.Symbol, time.Now().Unix(), msg.FromAddress, msg.ToAddress, msg.Amount, msg.BlockHeight, msg.TxHash, msg.Status, msg.OrderId} - err := s_ctx.sqlitedb.Insert(sql, data) - if err != nil { - log.Printf("❌ 插入 pay_resp 失败: %v", err) - return - } + // go func() { + // // 插入响应数据 + // sql := `INSERT INTO msg_pay_resp (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, height, txHash, status, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + // data := []any{msg.QueueId, msg.Chain, msg.Symbol, time.Now().Unix(), msg.FromAddress, msg.ToAddress, msg.Amount, msg.BlockHeight, msg.TxHash, msg.Status, msg.OrderId} + // err := s_ctx.sqlitedb.Insert(sql, data) + // if err != nil { + // log.Printf("❌ 插入 pay_resp 失败: %v", err) + // return + // } - // 删除对应数据 - del_sql := `DELETE FROM msg_pay_req WHERE queueId = ?;` - count, err := s_ctx.sqlitedb.Delete(del_sql, msg.QueueId) - if err != nil { - log.Printf("❌ 清理 pay_req 失败: %v, queueId=%s", err, msg.QueueId) - } else if count == 0 { - log.Printf("⚠️ 未找到要删除的 pay_req 记录: queueId=%s", msg.QueueId) - } else { - log.Printf("✅ 清理 pay_req 成功: 删除了 %d 条记录, queueId=%s", count, msg.QueueId) - } - }() + // // 删除对应数据 + // del_sql := `DELETE FROM msg_pay_req WHERE queueId = ?;` + // count, err := s_ctx.sqlitedb.Delete(del_sql, msg.QueueId) + // if err != nil { + // log.Printf("❌ 清理 pay_req 失败: %v, queueId=%s", err, msg.QueueId) + // } else if count == 0 { + // log.Printf("⚠️ 未找到要删除的 pay_req 记录: queueId=%s", msg.QueueId) + // } else { + // log.Printf("✅ 清理 pay_req 成功: 删除了 %d 条记录, queueId=%s", count, msg.QueueId) + // } + // }() default: log.Printf("⚠️ 未知消息类型: %T", event) @@ -547,7 +535,7 @@ func Start(msgKey string) { // ================== 启动链上事件监听通道 ================== chainEventCh := make(chan any, 1000) // 增加缓冲区,避免高并发丢消息 - go s_ctx.blockChainServer.Listen("ETH", "USDT", chainEventCh) + go s_ctx.blockChainServer.Listen("ETH", chainEventCh) // ================== 启动 RabbitMQ 监听 ================== initRmqListen() diff --git a/internal/utils/utils.go b/internal/utils/utils.go index b605257..8dd087f 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -6,6 +6,14 @@ import ( "math/big" ) +func BigIntETHToFloat64(value *big.Int) float64 { + f := new(big.Float).SetInt(value) + scale := new(big.Float).SetFloat64(1e18) // USDT 精度 6 位 + f.Quo(f, scale) + result, _ := f.Float64() + return result +} + func BigIntUSDTToFloat64(value *big.Int) float64 { f := new(big.Float).SetInt(value) scale := new(big.Float).SetFloat64(1e6) // USDT 精度 6 位 @@ -26,6 +34,16 @@ func Float64ToBigIntUSDT(amount float64) *big.Int { return bigAmount } +const ETHDecimals = 18 + +func Float64ToBigIntETH(amount float64) *big.Int { + // 乘上精度系数 + scale := math.Pow10(ETHDecimals) + bigAmount := new(big.Int) + bigAmount.SetInt64(int64(amount * scale)) + return bigAmount +} + func Slice_delete(arr []any, index int) []any { if index < 0 || index >= len(arr) { // 处理越界 diff --git a/public/SQLite3.sql b/public/SQLite3.sql index 9cff7b2..a90fe79 100644 --- a/public/SQLite3.sql +++ b/public/SQLite3.sql @@ -17,16 +17,35 @@ CREATE TABLE IF NOT EXISTS msg_withdraw_req ( PRIMARY KEY(queueId) ); -CREATE TABLE IF NOT EXISTS msg_pay_req ( - queueId TEXT, - chain TEXT, - symbol TEXT, - timestamp INTEGER, - from_addr TEXT, - to_addr TEXT, - amount NUMERIC, - orderId TEXT, - PRIMARY KEY(queueId) +-- CREATE TABLE IF NOT EXISTS msg_pay_req ( +-- queueId TEXT, +-- chain TEXT, +-- symbol TEXT, +-- timestamp INTEGER, +-- from_addr TEXT, +-- to_addr TEXT, +-- amount NUMERIC, +-- orderId TEXT, +-- PRIMARY KEY(queueId) +-- ); + +CREATE TABLE msg_pay_req ( + queueId TEXT PRIMARY KEY, + from_addr TEXT NOT NULL, + chain TEXT NOT NULL, + symbol TEXT NOT NULL, + total_amount REAL NOT NULL, + timestamp INTEGER NOT NULL, + sign TEXT NOT NULL +); + +CREATE TABLE msg_pay_req_transactions ( + queueId TEXT NOT NULL, + to_addr TEXT NOT NULL, + order_id TEXT NOT NULL, + tx_hash TEXT, + amount REAL NOT NULL, + FOREIGN KEY (queueId) REFERENCES msg_pay_req(queueId) ); CREATE TABLE IF NOT EXISTS msg_topup_resp ( @@ -55,17 +74,38 @@ CREATE TABLE IF NOT EXISTS msg_withdraw_resp ( status INTEGER ); -CREATE TABLE IF NOT EXISTS msg_pay_resp ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - queueId TEXT, - chain TEXT, - symbol TEXT, - timestamp INTEGER, - from_addr TEXT, - to_addr TEXT, - amount NUMERIC, - height INTEGER, - txHash TEXT, - orderId TEXT, - status INTEGER +-- CREATE TABLE IF NOT EXISTS msg_pay_resp ( +-- id INTEGER PRIMARY KEY AUTOINCREMENT, +-- queueId TEXT, +-- chain TEXT, +-- symbol TEXT, +-- timestamp INTEGER, +-- from_addr TEXT, +-- to_addr TEXT, +-- amount NUMERIC, +-- height INTEGER, +-- txHash TEXT, +-- orderId TEXT, +-- status INTEGER +-- ); + +CREATE TABLE msg_pay_resp ( + queueId TEXT PRIMARY KEY, + from_addr TEXT NOT NULL, + pay_status INTEGER NOT NULL, -- 1: At least one success, 3: Sign verification failed, 4: Insufficient balance + transactions JSON -- 存储交易的JSON格式 +); + +CREATE TABLE msg_pay_resp_transactions ( + queueId TEXT NOT NULL, + order_id TEXT NOT NULL, + from_addr TEXT NOT NULL, + to_addr TEXT NOT NULL, + chain TEXT NOT NULL, + symbol TEXT NOT NULL, + amount REAL NOT NULL, + tx_hash TEXT, + height INTEGER, + status INTEGER NOT NULL, -- 0: Failed, 1: Success + FOREIGN KEY (queueId) REFERENCES msg_pay_resp(queueId) ); \ No newline at end of file