package eth import ( "context" "fmt" "log" "m2pool-payment/internal/db" message "m2pool-payment/internal/msg" "m2pool-payment/internal/utils" "math/big" "strings" "sync" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" ) const erc20ABI = ` [ { "constant": true, "inputs": [{"name": "_owner", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "balance", "type": "uint256"}], "type": "function" }, { "constant": false, "inputs": [ {"name": "_to", "type": "address"}, {"name": "_value", "type": "uint256"} ], "name": "transfer", "outputs": [{"name": "", "type": "bool"}], "type": "function" }, { "anonymous": false, "inputs": [ {"indexed": true, "name": "from", "type": "address"}, {"indexed": true, "name": "to", "type": "address"}, {"indexed": false,"name": "value","type": "uint256"} ], "name": "Transfer", "type": "event" } ] ` type ETHNode struct { decodeKey string // 私钥解密密钥,从程序启动命令行获得 NetId *big.Int // 网络ID,主网为1,其他ID可通过rpc.NetworkID方法获取 Config message.ETHConfig // 配置文件 WsClient *ethclient.Client RpcClient *ethclient.Client Db db.MySQLPool mu sync.Mutex ListenAddresses sync.Map // key:"钱包地址", value:bool UnConfirmTxs map[string]message.Tx_msg // 待交易地址池,key:"交易hash", value: message.Tx USDT *USDT // ETH相关 RmqMsgs map[string][]any // 根据地址查找出该地址涉及的消息,消息需要断言(topupreq_msg, payreq_msg, withdrawreq_msg) Ctx context.Context Cancel context.CancelFunc } type USDT struct { Address common.Address // USDT合约地址 ABI abi.ABI // USDT ABI TransferSig common.Hash // USDT函数签名 LogsChan chan types.Log } func NewETHNode(cfg message.ETHConfig, decodeKey string) (*ETHNode, error) { // 连入ETH节点的ws ws_client, err := ethclient.Dial(cfg.WsURL) if err != nil { return nil, fmt.Errorf("failed to connect to Ethereum node: %w", err) } // 连入ETH节点的rpc rpc_client, err := ethclient.Dial(cfg.RpcURL) if err != nil { return nil, fmt.Errorf("failed to connect to Ethereum node rpc: %w", err) } // 创建可取消的 context ctx, cancel := context.WithCancel(context.Background()) // 获得net_id netId, err := rpc_client.NetworkID(ctx) if err != nil { cancel() return nil, fmt.Errorf("failed to connect to get node net_id: %w", err) } // 构造USDT合约相关 usdt := &USDT{} usdt.Address = common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7") // 解析合约地址 usdt.ABI = func() abi.ABI { a, _ := abi.JSON(strings.NewReader(erc20ABI)); return a }() // 解析合约ABI usdt.TransferSig = crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")) // 解析合约transfer函数签名 usdt.LogsChan = make(chan types.Log, 1000) // 初始化合约日志通道 // 初始化数据库 dbConn, err := db.NewMySQLPool(cfg.DbConfig) if err != nil { cancel() return nil, fmt.Errorf("mysql connect error: %w", err) } return ÐNode{ decodeKey: decodeKey, NetId: netId, Config: cfg, WsClient: ws_client, RpcClient: rpc_client, Db: *dbConn, ListenAddresses: sync.Map{}, UnConfirmTxs: make(map[string]message.Tx_msg), USDT: usdt, RmqMsgs: make(map[string][]any), Ctx: ctx, Cancel: cancel, }, nil } // ============================ 抽象接口 ============================ func (e *ETHNode) AddAddress(address string, rmq_msg any) { // 统一转换为小写 address = strings.ToLower(address) log.Printf("新增钱包监听消息:%v", rmq_msg) e.ListenAddresses.Store(address, true) e.mu.Lock() if len(e.RmqMsgs[address]) == 0 { e.RmqMsgs[address] = []any{rmq_msg} } else { e.RmqMsgs[address] = append(e.RmqMsgs[address], rmq_msg) } e.mu.Unlock() } func (e *ETHNode) RemoveAddress(address string) { // 统一转换为小写 address = strings.ToLower(address) e.ListenAddresses.Delete(address) e.mu.Lock() delete(e.RmqMsgs, address) e.mu.Unlock() } func (e *ETHNode) Listen(symbol string, ch chan any) { // 启动新区块监听(用于触发交易确认检查) go e.listenNewBlocks("USDT", ch) switch symbol { case "USDT": // 启动 USDT Transfer 事件监听 err := e.listen_usdt(ch) if err != nil { log.Fatal("Listen USDT Transactions Error:", err) } } } func (e *ETHNode) Transfer(symbol string, msg any) error { switch symbol { case "USDT": err := e.usdt_transfer(msg) if err != nil { return fmt.Errorf("%s transfer ERROR: %w", symbol, err) } default: return fmt.Errorf("unsupported symbol: %s", symbol) } return nil } // ============================ rpc节点方法 ============================ func (e *ETHNode) getETHBlance(address string) (*big.Int, error) { account := common.HexToAddress(address) ctx := context.Background() balance, err := e.RpcClient.BalanceAt(ctx, account, nil) // nil表示最新高度 if err != nil { return nil, fmt.Errorf("failed to get eth balance:%w", err) } // fBalance := new(big.Float).SetInt(balance) // ethValue := new(big.Float).Quo(fBalance, big.NewFloat(1e18)) // 转 ETH // value, _ := ethValue.Float64() // 转 float64 return balance, nil } func (e *ETHNode) getUSDTBalance(address string) (float64, error) { // 统一转换为小写(common.HexToAddress会自动处理,但为了一致性显式转换) address = strings.ToLower(address) contractAddress := e.USDT.Address accountAddress := common.HexToAddress(address) data, err := e.USDT.ABI.Pack("balanceOf", accountAddress) if err != nil { return 0, fmt.Errorf("failed to pack balanceOf data: %w", err) } msg := ethereum.CallMsg{ To: &contractAddress, Data: data, } // 使用 CallContract 方法查询合约余额 res, err := e.RpcClient.CallContract(e.Ctx, msg, nil) if err != nil { return 0, fmt.Errorf("failed to get contract balance: %w", err) } // 解析返回的字节为 *big.Int outputs, err := e.USDT.ABI.Unpack("balanceOf", res) if err != nil || len(outputs) == 0 { return 0, fmt.Errorf("failed to unpack balanceOf result: %w", err) } balance, ok := outputs[0].(*big.Int) if !ok { return 0, fmt.Errorf("unexpected type for balanceOf result") } bal := utils.BigIntUSDTToFloat64(balance) return bal, nil } func (e *ETHNode) getBlockHeight() (uint64, error) { header, err := e.RpcClient.HeaderByNumber(e.Ctx, nil) if err != nil { return 0, fmt.Errorf("failed to get latest block header: %w", err) } return header.Number.Uint64(), nil } func (e *ETHNode) getSuggestGasPrice() (*big.Int, error) { ctx := context.Background() gasPrice, err := e.RpcClient.SuggestGasPrice(ctx) if err != nil { return nil, fmt.Errorf("get suggest-gasprice error:%v", err) } return gasPrice, nil } // ============================ 业务方法 ============================ func (e *ETHNode) listen_usdt(ch chan any) error { fmt.Println("🔍 ETH 开始监听 USDT Transfer 事件...") // 过滤掉非USDT数据 query := ethereum.FilterQuery{ Addresses: []common.Address{e.USDT.Address}, } // 负责重连 for { // 订阅日志 sub, err := e.WsClient.SubscribeFilterLogs(e.Ctx, query, e.USDT.LogsChan) if err != nil { fmt.Println("❌ 订阅失败, 5秒后重试:", err) time.Sleep(5 * time.Second) continue } fmt.Println("✅ 订阅成功") // 处理事件 for { select { case err := <-sub.Err(): fmt.Println("⚠️ 订阅异常,准备重连:", err) sub.Unsubscribe() // 清理旧订阅 time.Sleep(3 * time.Second) goto reconnect // 跳出内层循环,回到外层重新订阅 case vLog := <-e.USDT.LogsChan: e.handleUSDTEvent(vLog, ch) // 事件解析 + 分类,传递链消息的通道是vLog而非ch,且一次只传递一笔交易 case <-e.Ctx.Done(): fmt.Println("🛑 收到停止信号,退出监听") sub.Unsubscribe() return e.Ctx.Err() } } reconnect: } } func (e *ETHNode) handleUSDTEvent(vLog types.Log, ch chan any) { from := common.HexToAddress(vLog.Topics[1].Hex()) to := common.HexToAddress(vLog.Topics[2].Hex()) height := vLog.BlockNumber fromAddr := strings.ToLower(from.Hex()) toAddr := strings.ToLower(to.Hex()) var transferEvent struct{ Value *big.Int } if err := e.USDT.ABI.UnpackIntoInterface(&transferEvent, "Transfer", vLog.Data); err != nil { fmt.Println("ABI 解析错误:", err) return } // 先验证toAddr是否在监听列表里面 _, ok := e.ListenAddresses.Load(toAddr) if !ok { return } tx_hash := vLog.TxHash.Hex() // tx, tx_ok := e.UnConfirmTxs[tx_hash] // if tx_ok { // // 【支付/提现】待确认交易中存在该交易Hash(说明是我们主动发起的交易) // // 直接走确认流程,不发送待确认消息 // // log.Printf("🔍 检测到已发起的交易: TxHash=%s, Type=%d", tx_hash, tx.TxType) // e.confirm("USDT", height, tx, ch) // } else { // 【充值】待确认交易中不存在该交易hash(说明是外部转账) // 添加至待确认交易中,并立即发送待确认消息 // 1,先根据to查询RmqMsgs,再根据存在的rmq_msg中的相关数据,存入待确认交易 value, rmq_msg_ok := e.RmqMsgs[toAddr] var tx_type int if rmq_msg_ok { for _, v := range value { _, ok := v.(message.TopupMsg_req) if ok { tx_type = 0 } _, ok1 := v.(message.WithdrawMsg_req) if ok1 { tx_type = 1 } _, ok2 := v.(message.PayMsg_req) if ok2 { tx_type = 2 } } } e.UnConfirmTxs[tx_hash] = message.Tx_msg{ TxType: tx_type, Tx: message.Tx{ From: fromAddr, To: toAddr, Height: height, TxHash: tx_hash, Symbol: "USDT", Value: utils.BigIntUSDTToFloat64(transferEvent.Value), Status: 2, // 待确认状态 }, } // log.Printf("📝 待确认交易新增: TxHash=%s, Height=%d, To=%s, Type=%d", tx_hash, height, toAddr, tx_type) // 🔔 【仅充值】立即发送待确认状态的消息(支付/提现不发送待确认消息) if tx_type == 0 && rmq_msg_ok { for _, v := range value { d1, ok := v.(message.TopupMsg_req) if ok && strings.ToLower(d1.Address) == toAddr { pendingMsg := message.TopupMsg_resp{ Address: toAddr, Status: 2, // 待确认状态 Chain: d1.Chain, Symbol: d1.Symbol, Amount: utils.BigIntUSDTToFloat64(transferEvent.Value), TxHash: tx_hash, } // log.Printf("📤 发送待确认充值消息: TxHash=%s, Address=%s, Amount=%.2f", // tx_hash, toAddr, pendingMsg.Amount) // 异步发送,避免阻塞事件处理 go func(msg message.TopupMsg_resp) { select { case ch <- msg: log.Printf("✅ 待确认充值消息已发送") default: log.Printf("⚠️ 通道阻塞,待确认消息发送失败") } }(pendingMsg) break } } // } } } // listenNewBlocks 监听新区块产生,触发交易确认检查 func (e *ETHNode) listenNewBlocks(symbol string, ch chan any) { fmt.Println("🔍 开始监听新区块...") headers := make(chan *types.Header, 10) // 负责重连 for { // 订阅新区块头 sub, err := e.WsClient.SubscribeNewHead(e.Ctx, headers) if err != nil { fmt.Println("❌ 订阅新区块失败, 5秒后重试:", err) time.Sleep(5 * time.Second) continue } fmt.Println("✅ 新区块订阅成功") // 处理新区块 for { select { case err := <-sub.Err(): fmt.Println("⚠️ 新区块订阅异常,准备重连:", err) sub.Unsubscribe() time.Sleep(3 * time.Second) goto reconnect case header := <-headers: // 每当有新区块,检查待确认交易 currentHeight := header.Number.Uint64() // log.Printf("🆕 新区块: %d", currentHeight) // 检查是否有待确认交易 e.mu.Lock() hasPendingTx := len(e.UnConfirmTxs) > 0 e.mu.Unlock() if hasPendingTx { e.checkAndConfirmTransactions(symbol, currentHeight, ch) } case <-e.Ctx.Done(): fmt.Println("🛑 停止新区块监听") sub.Unsubscribe() return } } reconnect: } } // checkAndConfirmTransactions 检查并确认达到确认高度的交易 func (e *ETHNode) checkAndConfirmTransactions(symbol string, currentHeight uint64, ch chan any) { e.mu.Lock() needConfirmList := []message.Tx_msg{} for _, tx := range e.UnConfirmTxs { // 检查是否达到确认高度 if currentHeight >= tx.Tx.Height+e.Config.ConfirmHeight { log.Printf("当前高度=%d, 交易高度=%d", currentHeight, tx.Tx.Height) // log.Printf("✅ 交易达到确认高度: TxHash=%s, 当前高度=%d, 交易高度=%d, 需确认=%d块", // txHash, currentHeight, tx.Tx.Height, e.Config.ConfirmHeight) needConfirmList = append(needConfirmList, tx) } } e.mu.Unlock() // 批量触发确认(在锁外执行,避免长时间持锁) for _, tx := range needConfirmList { e.confirm(symbol, currentHeight, tx, ch) } } func (e *ETHNode) confirm(symbol string, height uint64, tx message.Tx_msg, ch chan any) { switch symbol { case "USDT": e.confirm_usdt(tx, height, ch) } } func (e *ETHNode) confirm_usdt(tx message.Tx_msg, height uint64, ch chan any) { // 先从 UnConfirmTxs 中读取 e.mu.Lock() unConfirmTx, ok := e.UnConfirmTxs[tx.Tx.TxHash] e.mu.Unlock() if !ok { return } if height < unConfirmTx.Tx.Height { return } // 超过确认高度,查询交易数据 txHash := common.HexToHash(tx.Tx.TxHash) receipt, err := e.RpcClient.TransactionReceipt(e.Ctx, txHash) var status int if err != nil { log.Println("⚠️ 查询交易收据失败 TxHash=", txHash, err) status = 0 } else if receipt.Status == types.ReceiptStatusSuccessful { status = 1 } else { status = 0 } tx.Tx.Status = status // 通过通道发送,异步写避免阻塞 go func(tx message.Tx_msg) { e.mu.Lock() var result_msg any var matchIndex = -1 // 记录匹配的索引 rmq_msg := e.RmqMsgs[tx.Tx.To] for i, v := range rmq_msg { // 处理充值 d1, ok := v.(message.TopupMsg_req) if ok { // 统一转小写比较 if strings.ToLower(d1.Address) == tx.Tx.To { result_msg = message.TopupMsg_resp{ Address: tx.Tx.To, Status: tx.Tx.Status, Chain: d1.Chain, Symbol: d1.Symbol, Amount: tx.Tx.Value, TxHash: tx.Tx.TxHash, } // 充值消息不删除,可能会有多笔充值到同一地址 break } } // 处理提现 d2, ok1 := v.(message.WithdrawMsg_req) if ok1 { // 统一转小写比较 if strings.ToLower(d2.FromAddress) == tx.Tx.From && strings.ToLower(d2.ToAddress) == tx.Tx.To && d2.Amount == tx.Tx.Value { result_msg = message.WithdrawMsg_resp{ QueueId: d2.QueueId, Status: tx.Tx.Status, Amount: tx.Tx.Value, Chain: d2.Chain, Symbol: d2.Symbol, TxHash: tx.Tx.TxHash, } matchIndex = i // 记录索引,稍后删除 break } } // 处理支付 d3, ok2 := v.(message.PayMsg_req) if ok2 { // 统一转小写比较 if strings.ToLower(d3.FromAddress) == tx.Tx.From && strings.ToLower(d3.ToAddress) == tx.Tx.To && d3.Amount == tx.Tx.Value { result_msg = message.PayMsg_resp{ QueueId: d3.QueueId, Status: tx.Tx.Status, Amount: tx.Tx.Value, Chain: d3.Chain, Symbol: d3.Symbol, OrderId: d3.OrderId, TxHash: tx.Tx.TxHash, } matchIndex = i // 记录索引,稍后删除 break } } } // 循环结束后,统一删除匹配的消息(提现和支付需要删除) if matchIndex >= 0 { e.RmqMsgs[tx.Tx.To] = utils.Slice_delete(e.RmqMsgs[tx.Tx.To], matchIndex) } e.mu.Unlock() select { case ch <- result_msg: default: fmt.Println("⚠️ confirm通道阻塞,消息丢失:", txHash) } }(tx) // 删除已确认交易 e.mu.Lock() delete(e.UnConfirmTxs, tx.Tx.TxHash) e.mu.Unlock() } func (e *ETHNode) decodePrivatekey(address string) string { // 统一转换为小写 address = strings.ToLower(address) // 查询加密后的私钥 querySql := "SELECT `private_key` FROM eth_balance WHERE address = ? LIMIT 1;" log.Println("查询私钥的钱包地址:", address) var encryptedKey string err := e.Db.QueryRow(querySql, address).Scan(&encryptedKey) if err != nil { log.Println("❌ 查询私钥失败:", err) return "" } // 使用key解密 privateKey := encryptedKey // 实际使用时替换成具体的解密代码 // fmt.Println(privateKey) return privateKey } func (e *ETHNode) usdt_transfer(msg any) error { var user_from, final_from, to string var amount float64 // var tx_type int // now_height, err := e.getBlockHeight() // if err != nil { // return fmt.Errorf("get lastest height error: %v", err) // } // --------------------------------------------------------------------------------------------- // 断言,确定本次转账是哪个类型 // 支付操作 v, ok := msg.(message.PayMsg_req) if ok { e.AddAddress(v.ToAddress, v) // 存入该笔msg(AddAddress内部会转小写) // 统一转换为小写 user_from, final_from, to, amount = strings.ToLower(v.FromAddress), strings.ToLower(v.FromAddress), strings.ToLower(v.ToAddress), v.Amount // tx_type = 2 } // 提现操作 k, ok1 := msg.(message.WithdrawMsg_req) if ok1 { e.AddAddress(k.ToAddress, k) // 存入该笔msg(AddAddress内部会转小写) // 统一转换为小写 user_from, final_from, to, amount = strings.ToLower(k.FromAddress), strings.ToLower(k.FromAddress), strings.ToLower(k.ToAddress), k.Amount // tx_type = 1 } // --------------------------------------------------------------------------------------------- // 1,校验钱包余额 balance, err := e.getUSDTBalance(user_from) log.Printf("检测Transfer钱包=%s,余额=%f", user_from, balance) if err != nil { return fmt.Errorf("failed to get balance: %w", err) } // 2,钱包余额不足,调用归集钱包转账 if balance < amount { final_from = "归集钱包" } // 3,通过from地址前往数据库查找出对应加密后的私钥,并解密真实的私钥 originalKey := e.decodePrivatekey(final_from) if originalKey == "" { return fmt.Errorf("failed to query privatekey") } fmt.Println(originalKey) privateKey, err := crypto.HexToECDSA(originalKey) if err != nil { return fmt.Errorf("failed to parse private key: %w", err) } // 4, 获得nonce nonce, err := e.RpcClient.PendingNonceAt(e.Ctx, common.HexToAddress(final_from)) if err != nil { return fmt.Errorf("failed to get nonce: %w", err) } // 5, 构造交易(ERC20 transfer 调用) amountBigInt := utils.Float64ToBigIntUSDT(amount) data, err := e.USDT.ABI.Pack("transfer", common.HexToAddress(to), amountBigInt) // 打包 transfer(address,uint256) 方法调用 if err != nil { return fmt.Errorf("failed to pack transfer data: %w", err) } gasPrice, err := e.getSuggestGasPrice() // 获得当前建议gasPrice if err != nil { return fmt.Errorf("get suggest-gasprice error:%v", err) } eth_balance, err := e.getETHBlance(final_from) // 获得钱包eth余额 if err != nil { return fmt.Errorf("%w", err) } var gasLimit uint64 = 100000 gasLimit_b := new(big.Int).SetUint64(gasLimit) gas := new(big.Int).Mul(gasLimit_b, gasPrice) // 判断钱包eth是否支持本次交易gas费用 if eth_balance.Cmp(gas) == -1 { return fmt.Errorf("address=%s balance less than gas=%v(wei)", final_from, eth_balance) } // 构造发送到 USDT 合约地址的交易 tx := types.NewTransaction( nonce, e.USDT.Address, // 发送到USDT合约地址 big.NewInt(0), // value为0(ERC20转账不需要ETH) gasLimit, // GasLimit设置为100000(ERC20转账需要更多gas) gasPrice, // GasPrice: 20 Gwei data, // 附加数据:transfer方法调用 ) // 6, 签名交易并获得txHash signedTx, err := types.SignTx(tx, types.NewEIP155Signer(e.NetId), privateKey) // txHash := signedTx.Hash().Hex() // 通过签名信息解析出交易hash if err != nil { return fmt.Errorf("failed to sign transaction: %w", err) } // 7, 发送交易 err = e.RpcClient.SendTransaction(e.Ctx, signedTx) if err != nil { return fmt.Errorf("failed to send transaction: %w", err) } // // 8, 构造交易消息 // tx_msg := message.Tx_msg{ // TxType: tx_type, // Tx: message.Tx{ // From: final_from, // To: to, // Height: now_height, // TxHash: txHash, // Symbol: "USDT", // Value: amount, // Status: 2, // }, // } // // 9, 将构造的交易消息存入待确认交易中 // e.UnConfirmTxs[txHash] = tx_msg return nil } func (e *ETHNode) Stop() { e.Cancel() }