merge transactions and update

This commit is contained in:
lzx
2025-10-31 13:46:58 +08:00
parent 056bc05b75
commit 8d7da5d345
12 changed files with 1154 additions and 975 deletions

View File

@@ -6,10 +6,10 @@ import (
)
type IChainServer interface {
AddAddress(address string, msg any) error
RemoveAddress(address string) error
Listen(symbol string, ch chan any)
Transfer(symbol string, msg any) error
AddAddress(msg any) error
RemoveAddress(msg any) error
Listen(ch chan any)
Transfer(msg any) error
Stop()
}
@@ -30,40 +30,40 @@ func (b *BlockChainServer) RegisterChain(name string, chain IChainServer) {
b.chains[name] = chain
}
func (b *BlockChainServer) AddAddress(chain, address string, msg any) error {
func (b *BlockChainServer) AddAddress(chain string, msg any) error {
if srv, ok := b.chains[chain]; ok {
srv.AddAddress(address, msg)
fmt.Printf("✅ 添加监听地址: chain=%s, address=%s\n", chain, address)
srv.AddAddress(msg)
fmt.Printf("✅ 添加监听地址: chain=%s, msg=%v\n", chain, msg)
return nil
} else {
return fmt.Errorf("⚠️ 链未注册: %s\n", chain)
}
}
func (b *BlockChainServer) RemoveAddress(chain, address string) error {
func (b *BlockChainServer) RemoveAddress(chain string, msg any) error {
if srv, ok := b.chains[chain]; ok {
srv.RemoveAddress(address)
fmt.Printf("🗑️ 移除监听地址: chain=%s, address=%s\n", chain, address)
srv.RemoveAddress(msg)
fmt.Printf("🗑️ 移除监听地址: chain=%s, msg=%s\n", chain, msg)
return nil
} else {
return fmt.Errorf("⚠️ 链未注册: %s\n", chain)
}
}
func (b *BlockChainServer) Listen(chain, symbol string, ch chan any) error {
func (b *BlockChainServer) Listen(chain string, ch chan any) error {
if srv, ok := b.chains[chain]; ok {
go func() {
srv.Listen(symbol, ch)
srv.Listen(ch)
}()
return nil
}
return fmt.Errorf("链未注册: %s", chain)
}
func (b *BlockChainServer) Transfer(chain, symbol string, msg any) error {
func (b *BlockChainServer) Transfer(chain string, msg any) error {
if srv, ok := b.chains[chain]; ok {
fmt.Printf("💸 %s-%s发起转账: %+v\n", chain, symbol, msg)
return srv.Transfer(symbol, msg)
fmt.Printf("💸 %s发起转账: %+v\n", chain, msg)
return srv.Transfer(msg)
}
return fmt.Errorf("链未注册: %s", chain)
}

View File

@@ -1,213 +0,0 @@
package eth
import (
"crypto/ecdsa"
"fmt"
"log"
"m2pool-payment/internal/utils"
"math/big"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
)
// BatchTransferItem 批量转账单项
type BatchTransferItem struct {
ToAddress string // 接收地址
Amount float64 // 转账金额
}
// BatchTransferResult 批量转账结果
type BatchTransferResult struct {
TxHash string // 交易哈希
Success bool // 是否成功
TotalAmount float64 // 总转账金额
Count int // 转账笔数
}
// usdt_batch_transfer 批量转账ERC20-USDT
// from: 发送地址
// items: 批量转账列表
// returns: 交易哈希和错误信息
func (e *ETHNode) USDTBatchTransfer(from string, items []BatchTransferItem) (*BatchTransferResult, error) {
if len(items) == 0 {
return nil, fmt.Errorf("批量转账列表不能为空")
}
// 统一转换为小写
from = strings.ToLower(from)
// 计算总金额
var totalAmount float64
for _, item := range items {
if item.Amount <= 0 {
return nil, fmt.Errorf("转账金额必须大于0")
}
totalAmount += item.Amount
}
// 1. 校验钱包USDT余额
balance, err := e.getUSDTBalance(from)
log.Printf("🔄 批量转账 - 检测钱包=%s余额=%.2f USDT", from, balance)
if err != nil {
return nil, fmt.Errorf("获取余额失败: %w", err)
}
if balance < totalAmount {
return nil, fmt.Errorf("余额不足: 余额=%.2f USDT < 需要=%.2f USDT", balance, totalAmount)
}
// 2. 通过from地址前往数据库查找出对应加密后的私钥并解密真实的私钥
originalKey := e.decodePrivatekey(from)
if originalKey == "" {
return nil, fmt.Errorf("无法获取私钥")
}
privateKey, err := crypto.HexToECDSA(originalKey)
if err != nil {
return nil, fmt.Errorf("解析私钥失败: %w", err)
}
// 3. 获得nonce
nonce, err := e.RpcClient.PendingNonceAt(e.Ctx, common.HexToAddress(from))
if err != nil {
return nil, fmt.Errorf("获取nonce失败: %w", err)
}
// 4. 构造批量转账数据
// 使用 transfer(address[], uint256[]) 或多次transfer调用
// 这里使用多次transfer调用的方式因为标准ERC20没有批量转账方法
// 方法1: 构造多次transfer调用适合少量转账
if len(items) <= 3 {
return e.batchTransferMultipleCalls(from, privateKey, nonce, items)
}
// 方法2: 使用合约批量转账(需要部署代理合约,这里简化处理)
// 注意这里实现的是多次transaction方式
return e.batchTransferSeparateTransactions(from, privateKey, nonce, items)
}
// batchTransferMultipleCalls 使用一个交易多次调用transfer需要gas优化
func (e *ETHNode) batchTransferMultipleCalls(from string, privateKey *ecdsa.PrivateKey, nonce uint64, items []BatchTransferItem) (*BatchTransferResult, error) {
// 注意标准ERC20不支持批量transfer这里需要自定义合约
// 或者使用多次独立交易
log.Printf("⚠️ 标准ERC20不支持批量transfer改用多次独立交易")
// 回退到多次独立交易
return e.batchTransferSeparateTransactions(from, privateKey, nonce, items)
}
// batchTransferSeparateTransactions 执行多次独立的transfer交易
func (e *ETHNode) batchTransferSeparateTransactions(from string, privateKey *ecdsa.PrivateKey, nonce uint64, items []BatchTransferItem) (*BatchTransferResult, error) {
var totalAmount float64
var txHashes []string
var allSuccess bool = true
for i, item := range items {
// 构造单个transfer交易
amountBigInt := utils.Float64ToBigIntUSDT(item.Amount)
data, err := e.USDT.ABI.Pack("transfer", common.HexToAddress(strings.ToLower(item.ToAddress)), amountBigInt)
if err != nil {
log.Printf("❌ 批量转账第%d笔打包失败: %v", i+1, err)
allSuccess = false
continue
}
// 获取gas limit
gasLimit, err := e.getGasLimit()
if err != nil {
log.Printf("❌ 批量转账第%d笔获取gasLimit失败: %v", i+1, err)
allSuccess = false
continue
}
// 获取gas费用
maxFeePerGas, maxPriorityFeePerGas, err := e.getEIP1559GasFees()
var txHash string
if err != nil {
// 回退到传统gas price
gasPrice, err := e.getSuggestGasPrice()
if err != nil {
log.Printf("❌ 批量转账第%d笔获取gasPrice失败: %v", i+1, err)
allSuccess = false
continue
}
tx := types.NewTransaction(nonce+uint64(i), e.USDT.Address, big.NewInt(0), gasLimit, gasPrice, data)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(e.NetId), privateKey)
if err != nil {
log.Printf("❌ 批量转账第%d笔签名失败: %v", i+1, err)
allSuccess = false
continue
}
txHash = signedTx.Hash().Hex()
err = e.RpcClient.SendTransaction(e.Ctx, signedTx)
if err != nil {
log.Printf("❌ 批量转账第%d笔发送失败: %v", i+1, err)
allSuccess = false
continue
}
} else {
// 使用EIP-1559交易
ethBalance, err := e.getETHBlance(from)
if err != nil {
log.Printf("❌ 批量转账第%d笔获取ETH余额失败: %v", i+1, err)
allSuccess = false
continue
}
maxGasCost := new(big.Int).Mul(new(big.Int).SetUint64(gasLimit), maxFeePerGas)
if ethBalance.Cmp(maxGasCost) == -1 {
log.Printf("❌ 批量转账第%d笔ETH余额不足", i+1)
allSuccess = false
continue
}
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: e.NetId,
Nonce: nonce + uint64(i),
GasTipCap: maxPriorityFeePerGas,
GasFeeCap: maxFeePerGas,
Gas: gasLimit,
To: &e.USDT.Address,
Value: big.NewInt(0),
Data: data,
})
signedTx, err := types.SignTx(tx, types.NewLondonSigner(e.NetId), privateKey)
if err != nil {
log.Printf("❌ 批量转账第%d笔签名失败: %v", i+1, err)
allSuccess = false
continue
}
txHash = signedTx.Hash().Hex()
err = e.RpcClient.SendTransaction(e.Ctx, signedTx)
if err != nil {
log.Printf("❌ 批量转账第%d笔发送失败: %v", i+1, err)
allSuccess = false
continue
}
}
txHashes = append(txHashes, txHash)
totalAmount += item.Amount
log.Printf("✅ 批量转账第%d笔已提交: %s, 金额=%.2f USDT, 收款地址=%s",
i+1, txHash, item.Amount, strings.ToLower(item.ToAddress))
}
log.Printf("📊 批量转账完成: 总计%d笔, 成功%d笔, 总金额=%.2f USDT",
len(items), len(txHashes), totalAmount)
return &BatchTransferResult{
TxHash: strings.Join(txHashes, ","),
Success: allSuccess && len(txHashes) == len(items),
TotalAmount: totalAmount,
Count: len(txHashes),
}, nil
}

View File

@@ -1,101 +0,0 @@
# ERC20-USDT 批量转账功能
## 功能说明
该文件 `batch_transfer.go` 提供了 ERC20-USDT 的批量转账功能,支持从同一个发送地址向多个不同的接收地址转账。
## 主要功能
### 1. 批量转账类型
```go
type BatchTransferItem struct {
ToAddress string // 接收地址
Amount float64 // 转账金额
}
type BatchTransferResult struct {
TxHash string // 交易哈希(多个用逗号分隔)
Success bool // 是否成功
TotalAmount float64 // 总转账金额
Count int // 转账笔数
}
```
### 2. 使用方法
```go
// 1. 准备批量转账列表
items := []eth.BatchTransferItem{
{ToAddress: "0xRecipient1", Amount: 100.0},
{ToAddress: "0xRecipient2", Amount: 200.0},
{ToAddress: "0xRecipient3", Amount: 50.0},
}
// 2. 调用批量转账
fromAddress := "0xYourAddress"
result, err := ethNode.USDTBatchTransfer(fromAddress, items)
if err != nil {
log.Fatalf("批量转账失败: %v", err)
}
// 3. 处理结果
fmt.Printf("批量转账完成: %d笔, 总金额: %.2f USDT", result.Count, result.TotalAmount)
fmt.Printf("交易哈希: %s", result.TxHash)
```
## 工作原理
由于标准 ERC20 合约不支持批量转账,本实现采用以下策略:
1. **多次独立交易**:对每笔转账创建一个独立的 ERC20 `transfer` 交易
2. **Nonce 管理**:自动管理 nonce确保交易按顺序广播
3. **Gas 费用**:支持 EIP-1559 动态费用和传统 gas price
4. **错误处理**:单笔失败不影响其他交易,返回成功和失败的详细统计
## 注意事项
### 1. Gas 费用
- 每笔转账需要独立的 gas 费用(约 65,000 gas
- 批量转账 10 笔需要约 650,000 gas
- 确保发送地址有足够的 ETH 作为 gas 费用
### 2. 余额检查
- 函数会自动检查 USDT 余额是否足够
- 如果余额不足,会返回错误并终止转账
### 3. 部分成功
- 如果某些转账失败,函数会继续执行其他转账
- 返回结果中包含成功笔数和详细交易哈希
### 4. 网络拥堵
- 在高网络拥堵时,某些交易可能被推迟
- 建议监控所有交易状态
## 性能优化建议
如果需要更高效的批量转账,考虑:
1. **部署批量转账代理合约**:实现一个合约方法 `batchTransfer(address[] to, uint256[] amounts)`
2. **使用多签钱包**:减少私钥管理风险
3. **Gas 优化**:使用更低的 gas price 分批发送
## 示例输出
```
🔄 批量转账 - 检测钱包=0x...,余额=1000.00 USDT
✅ 批量转账第1笔已提交: 0xabc123..., 金额=100.00 USDT, 收款地址=0x...
✅ 批量转账第2笔已提交: 0xdef456..., 金额=200.00 USDT, 收款地址=0x...
✅ 批量转账第3笔已提交: 0x789ghi..., 金额=50.00 USDT, 收款地址=0x...
📊 批量转账完成: 总计3笔, 成功3笔, 总金额=350.00 USDT
```
## 限制
- 标准 ERC20 不支持真正的批量转账(单笔交易)
- 需要确保发送地址有足够的 ETH 作为 gas 费用
- 交易按顺序发送,可能在高负载时较慢

View File

@@ -0,0 +1,13 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract MultiSend {
function multiTransfer(address[] calldata to, uint256[] calldata amounts) external payable {
uint256 length = to.length;
require(length == amounts.length, "Arrays must have the same length");
for (uint256 i = 0; i < length; i++) {
payable(to[i]).transfer(amounts[i]);
}
}
}

View File

@@ -0,0 +1,24 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
interface IERC20 {
function transfer(address recipient, uint256 amount) external returns (bool);
}
contract MultiSendUSDT {
address public tokenAddress;
constructor(address _tokenAddress) {
tokenAddress = _tokenAddress;
}
function multiTransfer(address[] calldata to, uint256[] calldata amounts) external {
IERC20 token = IERC20(tokenAddress);
uint256 length = to.length;
require(length == amounts.length, "Arrays must have the same length");
for (uint256 i = 0; i < length; i++) {
token.transfer(to[i], amounts[i]);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -2,8 +2,10 @@ package logger
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
message "m2pool-payment/internal/msg"
"os"
"path/filepath"
"sync"
@@ -231,21 +233,25 @@ func LogWithdraw(toAddress string, status string, amount float64, fromAddress st
}
// LogPay 记录支付消息
func LogPay(toAddress string, status string, amount float64, fromAddress string, txHash string, blockHeight uint64, orderId string, queueId string) {
func LogPay(status string, fromAddress string, queueId string, transactions map[string]*message.PayData_resp) {
if txLogger == nil {
return
}
// 使用 toAddress 作为文件名
lf, err := txLogger.getOrCreateLogFile(toAddress)
lf, err := txLogger.getOrCreateLogFile(fromAddress)
if err != nil {
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t, err := json.Marshal(transactions)
if err != nil {
fmt.Println("Error marshalling to JSON:", err)
return
}
timestamp := time.Now().Format("2006-01-02 15:04:05")
content := fmt.Sprintf("%s [pay]-[%s] | 金额: %.6f | FromAddress: %s | ToAddress: %s | 交易哈希: %s | 区块高度: %d | OrderId: %s | QueueId: %s",
timestamp, status, amount, fromAddress, toAddress, txHash, blockHeight, orderId, queueId)
content := fmt.Sprintf("%s [pay]-[%s] | FromAddress: %s | QueueId: %s | Transactions: %v",
timestamp, status, fromAddress, queueId, string(t))
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)

View File

@@ -60,17 +60,20 @@ type DbConfig struct {
// =============================== type0 ===============================
// 接收的充值消息
type TopupMsg_req struct {
QueueId string `json:"queue_id"`
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
Address string `json:"address"`
Timestamp uint64 `json:"timestamp"`
Sign string `json:"sign"`
Status int `json:"status,omitempty"`
}
// 返回充值结果消息
type TopupMsg_resp struct {
QueueId string `json:"queue_id"`
Address string `json:"address"`
Status int `json:"status"`
Status int `json:"status"` // 0失败1成功2待定3sign校验失败
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
Amount float64 `json:"amount"`
@@ -87,8 +90,10 @@ type WithdrawMsg_req struct {
Amount float64 `json:"amount"`
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
TxHash string `json:"tx_hash,omitempty"`
Timestamp uint64 `json:"timestamp"`
Sign string `json:"sign"`
Status int `json:"status,omitempty"`
}
// 返回提现结果消息
@@ -96,7 +101,7 @@ type WithdrawMsg_resp struct {
QueueId string `json:"queue_id"`
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
Status int `json:"status"`
Status int `json:"status"` // 0失败1成功3sign校验失败
Amount float64 `json:"amount"`
TxHash string `json:"tx_hash"`
FromAddress string `json:"from_address"` // 来源地址
@@ -105,36 +110,74 @@ type WithdrawMsg_resp struct {
}
// =============================== type2 ===============================
// 接收到的支付消息
type PayMsg_req struct {
QueueId string `json:"queue_id"`
FromAddress string `json:"from_address"` // 我们提供的地址
ToAddress string `json:"to_address"` // 卖家地址
Amount float64 `json:"amount"`
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
OrderId string `json:"order_id"` // 订单号
Timestamp uint64 `json:"timestamp"`
Sign string `json:"sign"`
QueueId string `json:"queue_id"`
FromAddress string `json:"from_address"`
Chain string `json:"chain"`
Symbol string `json:"symbol"`
TotalAmount float64 `json:"total_amount"`
Timestamp uint64 `json:"timestamp"`
Sign string `json:"sign"`
Trasnactions map[string]*PayData_req `json:"transactions"` // {"to_address": PayData_req{}, ...}
}
type PayData_req struct {
OrderId string `json:"order_id"`
ToAddress string `json:"to_address"`
TxHash string `json:"tx_hash,omitempty"`
Amount float64 `json:"amount"`
Status int `json:"status,omitempty"`
}
// 返回支付结果消息
type PayMsg_resp struct {
QueueId string `json:"queue_id"`
Status int `json:"status"`
Amount float64 `json:"amount"`
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
OrderId string `json:"order_id"` // 订单号
TxHash string `json:"tx_hash"`
FromAddress string `json:"from_address"` // 来源地址
ToAddress string `json:"to_address"` // 目标地址
BlockHeight uint64 `json:"block_height"` // 区块高度
QueueId string `json:"queue_id"`
FromAddress string `json:"from_address"`
PayStatus int `json:"pay_status"` // 1至少有一笔转账成功3sign校验失败4钱包余额不足
Transactions map[string]*PayData_resp `json:"transactions"` // {"to_address": PayData_resp{}, ...}
}
type PayData_resp struct {
OrderId string `json:"order_id"`
FromAddress string `json:"from_address"`
ToAddress string `json:"to_address"`
Chain string `json:"chain"`
Symbol string `json:"symbol"`
Amount float64 `json:"amount"`
TxHash string `json:"tx_hash,omitempty"`
BlockHeight uint64 `json:"block_height,omitempty"`
Status int `json:"status"` // 0失败1成功
}
// 接收到的支付消息
// type PayMsg_req struct {
// QueueId string `json:"queue_id"`
// FromAddress string `json:"from_address"` // 我们提供的地址
// ToAddress string `json:"to_address"` // 卖家地址
// Amount float64 `json:"amount"`
// Chain string `json:"chain"` // 链名称
// Symbol string `json:"symbol"` // 币种
// OrderId string `json:"order_id"` // 订单号
// Timestamp uint64 `json:"timestamp"`
// Sign string `json:"sign"`
// Transactions map[string]Transaction `json:"tx"`
// }
// 返回支付结果消息
// type PayMsg_resp struct {
// QueueId string `json:"queue_id"`
// Status int `json:"status"`
// Amount float64 `json:"amount"`
// Chain string `json:"chain"` // 链名称
// Symbol string `json:"symbol"` // 币种
// OrderId string `json:"order_id"` // 订单号
// TxHash string `json:"tx_hash"`
// FromAddress string `json:"from_address"` // 来源地址
// ToAddress string `json:"to_address"` // 目标地址
// BlockHeight uint64 `json:"block_height"` // 区块高度
// }
// =============================== type3 ===============================
// 接收到的删除监听地址消息
type RemoveListenMsg_req struct {
QueueId string `json:"queue_id"`
MsgType int `json:"msg_type"`
Chain string `json:"chain"`
Symbol string `json:"symbol"`
@@ -145,6 +188,7 @@ type RemoveListenMsg_req struct {
// 返回收到的删除监听地址消息
type RemoveListenMsg_resp struct {
QueueId string `json:"queue_id"`
MsgType int `json:"msg_type"`
Chain string `json:"chain"`
Symbol string `json:"symbol"`
@@ -167,3 +211,13 @@ type Tx struct {
Value float64 `json:"value"` // 数量,单位是币
Status int `json:"status"` // 交易状态1成功0失败, 2待确认
}
type Transaction struct {
From string `json:"from"` // 充值/提现/支付的来源地址
To string `json:"to"` // 充值/提现/支付的目标地址
Height uint64 `json:"height"` // 区块高度
TxHash string `json:"tx_hash"` // 交易哈希
Symbol string `json:"symbol"` // 币种
Value float64 `json:"value"` // 数量,单位是币
Status int `json:"status"` // 交易状态1成功0失败, 2待确认
}

View File

@@ -22,9 +22,10 @@ type RabbitMQServer struct {
cancel context.CancelFunc
// 消息处理回调函数
OnTopupMsg func(message.TopupMsg_req) // 充值请求回调
OnWithdrawMsg func(message.WithdrawMsg_req) // 提现请求回调
OnPayMsg func(message.PayMsg_req) // 支付请求回调
OnTopupMsg func(message.TopupMsg_req) // 充值请求回调
OnWithdrawMsg func(message.WithdrawMsg_req) // 提现请求回调
OnPayMsg func(message.PayMsg_req) // 支付请求回调
OnRemoveMsg func(message.RemoveListenMsg_req) // 删除充值监听回调
}
// NewRabbitMQServer 创建 RabbitMQ 服务
@@ -68,9 +69,11 @@ func (r *RabbitMQServer) setupQueuesAndExchanges() error {
r.config.PayConfig,
r.config.TopUpConfig,
r.config.WithdrawConfig,
r.config.RemoveConfig,
r.config.PayRespConfig,
r.config.TopUpRespConfig,
r.config.WithdrawRespConfig,
r.config.RemoveRespConfig,
}
for _, cfg := range configs {
@@ -131,6 +134,8 @@ func (r *RabbitMQServer) Start() error {
go r.consumeWithdraw()
// 启动支付消息监听
go r.consumePay()
// 启动删除充值监听
go r.consumeRemove()
// log.Println("🚀 RabbitMQ 服务启动成功,开始监听消息...")
return nil
@@ -188,8 +193,8 @@ func (r *RabbitMQServer) consumePay() {
if err := json.Unmarshal(body, &msg); err != nil {
return fmt.Errorf("failed to parse pay message: %w", err)
}
log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, OrderId=%s, From=%s, To=%s, Amount=%.2f %s",
msg.QueueId, msg.OrderId, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Symbol)
log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, From=%s, Chain=%s, Symbol=%s, TxCount=%d",
msg.QueueId, msg.FromAddress, msg.Chain, msg.Symbol, len(msg.Trasnactions))
if r.OnPayMsg != nil {
r.OnPayMsg(msg)
@@ -199,6 +204,26 @@ func (r *RabbitMQServer) consumePay() {
)
}
// consumeRemove 消费删除充值监听消息
func (r *RabbitMQServer) consumeRemove() {
r.consumeQueue(
r.config.RemoveConfig.QueueName,
"remove",
func(body []byte) error {
var msg message.RemoveListenMsg_req
if err := json.Unmarshal(body, &msg); err != nil {
return fmt.Errorf("failed to parse remove message: %w", err)
}
log.Printf("📥 [RMQ] 收到删除充值监听: Chain=%s, Symbol=%s, Address=%s", msg.Chain, msg.Symbol, msg.Address)
if r.OnRemoveMsg != nil {
r.OnRemoveMsg(msg)
}
return nil
},
)
}
// consumeQueue 通用队列消费方法
func (r *RabbitMQServer) consumeQueue(queueName, msgType string, handler func([]byte) error) {
for {
@@ -268,8 +293,16 @@ func (r *RabbitMQServer) PublishPayResp(resp message.PayMsg_resp) error {
return r.publishMessage(
r.config.PayRespConfig,
resp,
fmt.Sprintf("支付响应: QueueId=%s, OrderId=%s, Status=%d, TxHash=%s",
resp.QueueId, resp.OrderId, resp.Status, resp.TxHash),
"支付响应",
)
}
// PublishRemoveResp 发布删除充值监听响应
func (r *RabbitMQServer) PublishRemoveResp(resp message.RemoveListenMsg_resp) error {
return r.publishMessage(
r.config.RemoveRespConfig,
resp,
fmt.Sprintf("删除充值监听响应: Address=%s, Status=%d", resp.Address, resp.Status),
)
}

View File

@@ -6,7 +6,7 @@ import (
"fmt"
"log"
"m2pool-payment/internal/blockchain"
"m2pool-payment/internal/blockchain/eth"
eth "m2pool-payment/internal/blockchain/eth"
"m2pool-payment/internal/crypto"
"m2pool-payment/internal/db"
"m2pool-payment/internal/logger"
@@ -19,7 +19,7 @@ import (
"time"
)
const MSG_KEY string = "9f3c7a12"
// const MSG_KEY string = "9f3c7a12"
// 状态码常量
const (
@@ -41,7 +41,7 @@ var s_ctx ServerCtx
// verifyMessage 验证消息签名
func verifyMessage(timestamp uint64, sign string) bool {
hash_byte := crypto.Sha256Hash(fmt.Sprintf("%x", timestamp) + MSG_KEY)
hash_byte := crypto.Sha256Hash(fmt.Sprintf("%x", timestamp) + s_ctx.msgKey)
hash := hex.EncodeToString(hash_byte)
return hash == sign
}
@@ -109,7 +109,7 @@ func loadTopupReqMsg() error {
if err := rows.Scan(&topupReq_msg.Chain, &topupReq_msg.Symbol, &topupReq_msg.Timestamp, &topupReq_msg.Address); err != nil {
return err
}
s_ctx.blockChainServer.AddAddress(topupReq_msg.Chain, topupReq_msg.Address, topupReq_msg)
s_ctx.blockChainServer.AddAddress(topupReq_msg.Chain, topupReq_msg)
}
if !hasData {
@@ -141,7 +141,7 @@ func loadWithdrawReqMsg() error {
if err := rows.Scan(&withdrawReq_msg.QueueId, &withdrawReq_msg.Chain, &withdrawReq_msg.Symbol, &withdrawReq_msg.Timestamp, &withdrawReq_msg.FromAddress, &withdrawReq_msg.ToAddress, &withdrawReq_msg.Amount); err != nil {
return err
}
s_ctx.blockChainServer.AddAddress(withdrawReq_msg.Chain, withdrawReq_msg.ToAddress, withdrawReq_msg)
s_ctx.blockChainServer.AddAddress(withdrawReq_msg.Chain, withdrawReq_msg)
}
if !hasData {
@@ -157,7 +157,7 @@ func loadWithdrawReqMsg() error {
}
func loadPayReqMsg() error {
sql := `SELECT queueId, chain, symbol, timestamp, from_addr, to_addr, amount, orderId FROM msg_pay_req;`
sql := `SELECT queueId, chain, symbol, timestamp, from_addr, total_amount FROM msg_pay_req;`
rows, err := s_ctx.sqlitedb.DB.Query(sql)
if err != nil {
return fmt.Errorf("query history pay-msg error: %w", err)
@@ -168,10 +168,10 @@ func loadPayReqMsg() error {
hasData := false
for rows.Next() {
hasData = true
if err := rows.Scan(&payReq_msg.QueueId, &payReq_msg.Chain, &payReq_msg.Symbol, &payReq_msg.Timestamp, &payReq_msg.FromAddress, &payReq_msg.ToAddress, &payReq_msg.Amount, &payReq_msg.OrderId); err != nil {
if err := rows.Scan(&payReq_msg.QueueId, &payReq_msg.Chain, &payReq_msg.Symbol, &payReq_msg.Timestamp, &payReq_msg.FromAddress, &payReq_msg.TotalAmount); err != nil {
return err
}
s_ctx.blockChainServer.AddAddress(payReq_msg.Chain, payReq_msg.ToAddress, payReq_msg)
s_ctx.blockChainServer.AddAddress(payReq_msg.Chain, payReq_msg)
}
if !hasData {
@@ -235,7 +235,7 @@ func handleTopupMsg() {
// 添加监听地址
// go func() {
err := s_ctx.blockChainServer.AddAddress(msg.Chain, msg.Address, msg)
err := s_ctx.blockChainServer.AddAddress(msg.Chain, msg)
if err != nil {
log.Printf("❌ 添加监听地址失败: %v", err)
// 发送失败响应
@@ -285,7 +285,7 @@ func handleWithdrawMsg() {
}
// 执行转账
err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg)
err := s_ctx.blockChainServer.Transfer(msg.Chain, msg)
if err != nil {
log.Printf("❌ 提现转账失败: %v", err)
// 发送失败响应
@@ -300,7 +300,7 @@ func handleWithdrawMsg() {
return // 转账失败时直接返回,不进入链上确认流程
}
// go func() {
err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg.ToAddress, msg)
err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg)
if err != nil {
log.Printf("❌ 添加监听地址失败: %v", err)
// 发送失败响应
@@ -328,18 +328,14 @@ func handleWithdrawMsg() {
func handlePayMsg() {
s_ctx.rmqServer.OnPayMsg = func(msg message.PayMsg_req) {
msg.FromAddress = strings.ToLower(msg.FromAddress)
msg.ToAddress = strings.ToLower(msg.ToAddress)
// msg.ToAddress = strings.ToLower(msg.ToAddress)
// 验证签名
if !verifyMessage(msg.Timestamp, msg.Sign) {
err := s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{
QueueId: msg.QueueId,
Status: STATUS_VERIFY_FAILED,
Amount: msg.Amount,
Chain: msg.Chain,
Symbol: msg.Symbol,
OrderId: msg.OrderId,
TxHash: "",
QueueId: msg.QueueId,
FromAddress: msg.FromAddress,
PayStatus: STATUS_VERIFY_FAILED,
})
if err != nil {
log.Printf("❌ 发布支付失败响应失败: %v", err)
@@ -348,41 +344,33 @@ func handlePayMsg() {
}
// 执行转账
err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg)
err := s_ctx.blockChainServer.Transfer(msg.Chain, msg)
if err != nil {
log.Printf("❌ 支付转账失败: %v", err)
// 发送失败响应
s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{
QueueId: msg.QueueId,
Status: STATUS_FAILED,
Amount: msg.Amount,
Chain: msg.Chain,
Symbol: msg.Symbol,
OrderId: msg.OrderId,
TxHash: "",
QueueId: msg.QueueId,
FromAddress: msg.FromAddress,
PayStatus: STATUS_FAILED,
})
return // 转账失败时直接返回,不进入链上确认流程
}
// go func() {
err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg.ToAddress, msg)
err = s_ctx.blockChainServer.AddAddress(msg.Chain, msg)
if err != nil {
log.Printf("❌ 添加监听地址失败: %v", err)
// 发送失败响应
s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{
QueueId: msg.QueueId,
Status: STATUS_FAILED,
Amount: msg.Amount,
Chain: msg.Chain,
Symbol: msg.Symbol,
OrderId: msg.OrderId,
TxHash: "",
QueueId: msg.QueueId,
FromAddress: msg.FromAddress,
PayStatus: STATUS_FAILED,
})
return
}
// }()
// 将新增数据写入sqlite
insert_sql := `INSERT OR REPLACE INTO msg_pay_req (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
data := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Timestamp, msg.FromAddress, msg.ToAddress, msg.Amount, msg.OrderId}
insert_sql := `INSERT OR REPLACE INTO msg_pay_req (queueId, chain, symbol, timestamp, from_addr, total_amount) VALUES (?, ?, ?, ?, ?, ?)`
data := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Timestamp, msg.FromAddress, msg.TotalAmount}
err = s_ctx.sqlitedb.Insert(insert_sql, data)
if err != nil {
log.Printf("❌ 插入 pay_req 失败: %v, data: %+v", err, data)
@@ -481,37 +469,37 @@ func handleChainEvent(chainEventCh chan any) {
case message.PayMsg_resp:
// 支付确认
log.Printf("✅ [链上] 支付确认: QueueId=%s, OrderId=%s, Amount=%.2f, TxHash=%s, Status=%d",
msg.QueueId, msg.OrderId, msg.Amount, msg.TxHash, msg.Status)
log.Printf("✅ [链上] 支付确认: QueueId=%s, FromAddress=%s, Status=%d",
msg.QueueId, msg.FromAddress, msg.PayStatus)
// 记录交易日志
logger.LogPay(msg.ToAddress, "确认", msg.Amount, msg.FromAddress, msg.TxHash, msg.BlockHeight, msg.OrderId, msg.QueueId)
logger.LogPay("全部交易确认", msg.FromAddress, msg.QueueId, msg.Transactions)
err := s_ctx.rmqServer.PublishPayResp(msg)
if err != nil {
log.Printf("❌ 发送支付响应失败: %v", err)
return
}
go func() {
// 插入响应数据
sql := `INSERT INTO msg_pay_resp (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, height, txHash, status, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
data := []any{msg.QueueId, msg.Chain, msg.Symbol, time.Now().Unix(), msg.FromAddress, msg.ToAddress, msg.Amount, msg.BlockHeight, msg.TxHash, msg.Status, msg.OrderId}
err := s_ctx.sqlitedb.Insert(sql, data)
if err != nil {
log.Printf("❌ 插入 pay_resp 失败: %v", err)
return
}
// go func() {
// // 插入响应数据
// sql := `INSERT INTO msg_pay_resp (queueId, chain, symbol, timestamp, from_addr, to_addr, amount, height, txHash, status, orderId) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
// data := []any{msg.QueueId, msg.Chain, msg.Symbol, time.Now().Unix(), msg.FromAddress, msg.ToAddress, msg.Amount, msg.BlockHeight, msg.TxHash, msg.Status, msg.OrderId}
// err := s_ctx.sqlitedb.Insert(sql, data)
// if err != nil {
// log.Printf("❌ 插入 pay_resp 失败: %v", err)
// return
// }
// 删除对应数据
del_sql := `DELETE FROM msg_pay_req WHERE queueId = ?;`
count, err := s_ctx.sqlitedb.Delete(del_sql, msg.QueueId)
if err != nil {
log.Printf("❌ 清理 pay_req 失败: %v, queueId=%s", err, msg.QueueId)
} else if count == 0 {
log.Printf("⚠️ 未找到要删除的 pay_req 记录: queueId=%s", msg.QueueId)
} else {
log.Printf("✅ 清理 pay_req 成功: 删除了 %d 条记录, queueId=%s", count, msg.QueueId)
}
}()
// // 删除对应数据
// del_sql := `DELETE FROM msg_pay_req WHERE queueId = ?;`
// count, err := s_ctx.sqlitedb.Delete(del_sql, msg.QueueId)
// if err != nil {
// log.Printf("❌ 清理 pay_req 失败: %v, queueId=%s", err, msg.QueueId)
// } else if count == 0 {
// log.Printf("⚠️ 未找到要删除的 pay_req 记录: queueId=%s", msg.QueueId)
// } else {
// log.Printf("✅ 清理 pay_req 成功: 删除了 %d 条记录, queueId=%s", count, msg.QueueId)
// }
// }()
default:
log.Printf("⚠️ 未知消息类型: %T", event)
@@ -547,7 +535,7 @@ func Start(msgKey string) {
// ================== 启动链上事件监听通道 ==================
chainEventCh := make(chan any, 1000) // 增加缓冲区,避免高并发丢消息
go s_ctx.blockChainServer.Listen("ETH", "USDT", chainEventCh)
go s_ctx.blockChainServer.Listen("ETH", chainEventCh)
// ================== 启动 RabbitMQ 监听 ==================
initRmqListen()

View File

@@ -6,6 +6,14 @@ import (
"math/big"
)
func BigIntETHToFloat64(value *big.Int) float64 {
f := new(big.Float).SetInt(value)
scale := new(big.Float).SetFloat64(1e18) // USDT 精度 6 位
f.Quo(f, scale)
result, _ := f.Float64()
return result
}
func BigIntUSDTToFloat64(value *big.Int) float64 {
f := new(big.Float).SetInt(value)
scale := new(big.Float).SetFloat64(1e6) // USDT 精度 6 位
@@ -26,6 +34,16 @@ func Float64ToBigIntUSDT(amount float64) *big.Int {
return bigAmount
}
const ETHDecimals = 18
func Float64ToBigIntETH(amount float64) *big.Int {
// 乘上精度系数
scale := math.Pow10(ETHDecimals)
bigAmount := new(big.Int)
bigAmount.SetInt64(int64(amount * scale))
return bigAmount
}
func Slice_delete(arr []any, index int) []any {
if index < 0 || index >= len(arr) {
// 处理越界

View File

@@ -17,16 +17,35 @@ CREATE TABLE IF NOT EXISTS msg_withdraw_req (
PRIMARY KEY(queueId)
);
CREATE TABLE IF NOT EXISTS msg_pay_req (
queueId TEXT,
chain TEXT,
symbol TEXT,
timestamp INTEGER,
from_addr TEXT,
to_addr TEXT,
amount NUMERIC,
orderId TEXT,
PRIMARY KEY(queueId)
-- CREATE TABLE IF NOT EXISTS msg_pay_req (
-- queueId TEXT,
-- chain TEXT,
-- symbol TEXT,
-- timestamp INTEGER,
-- from_addr TEXT,
-- to_addr TEXT,
-- amount NUMERIC,
-- orderId TEXT,
-- PRIMARY KEY(queueId)
-- );
CREATE TABLE msg_pay_req (
queueId TEXT PRIMARY KEY,
from_addr TEXT NOT NULL,
chain TEXT NOT NULL,
symbol TEXT NOT NULL,
total_amount REAL NOT NULL,
timestamp INTEGER NOT NULL,
sign TEXT NOT NULL
);
CREATE TABLE msg_pay_req_transactions (
queueId TEXT NOT NULL,
to_addr TEXT NOT NULL,
order_id TEXT NOT NULL,
tx_hash TEXT,
amount REAL NOT NULL,
FOREIGN KEY (queueId) REFERENCES msg_pay_req(queueId)
);
CREATE TABLE IF NOT EXISTS msg_topup_resp (
@@ -55,17 +74,38 @@ CREATE TABLE IF NOT EXISTS msg_withdraw_resp (
status INTEGER
);
CREATE TABLE IF NOT EXISTS msg_pay_resp (
id INTEGER PRIMARY KEY AUTOINCREMENT,
queueId TEXT,
chain TEXT,
symbol TEXT,
timestamp INTEGER,
from_addr TEXT,
to_addr TEXT,
amount NUMERIC,
height INTEGER,
txHash TEXT,
orderId TEXT,
status INTEGER
-- CREATE TABLE IF NOT EXISTS msg_pay_resp (
-- id INTEGER PRIMARY KEY AUTOINCREMENT,
-- queueId TEXT,
-- chain TEXT,
-- symbol TEXT,
-- timestamp INTEGER,
-- from_addr TEXT,
-- to_addr TEXT,
-- amount NUMERIC,
-- height INTEGER,
-- txHash TEXT,
-- orderId TEXT,
-- status INTEGER
-- );
CREATE TABLE msg_pay_resp (
queueId TEXT PRIMARY KEY,
from_addr TEXT NOT NULL,
pay_status INTEGER NOT NULL, -- 1: At least one success, 3: Sign verification failed, 4: Insufficient balance
transactions JSON -- 存储交易的JSON格式
);
CREATE TABLE msg_pay_resp_transactions (
queueId TEXT NOT NULL,
order_id TEXT NOT NULL,
from_addr TEXT NOT NULL,
to_addr TEXT NOT NULL,
chain TEXT NOT NULL,
symbol TEXT NOT NULL,
amount REAL NOT NULL,
tx_hash TEXT,
height INTEGER,
status INTEGER NOT NULL, -- 0: Failed, 1: Success
FOREIGN KEY (queueId) REFERENCES msg_pay_resp(queueId)
);