Files
m2pool_payment/internal/blockchain/eth/eth.go
2025-10-21 14:25:15 +08:00

699 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package eth
import (
"context"
"fmt"
"log"
"m2pool-payment/internal/db"
message "m2pool-payment/internal/msg"
"m2pool-payment/internal/utils"
"math/big"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
)
const erc20ABI = `
[
{
"constant": true,
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"type": "function"
},
{
"constant": false,
"inputs": [
{"name": "_to", "type": "address"},
{"name": "_value", "type": "uint256"}
],
"name": "transfer",
"outputs": [{"name": "", "type": "bool"}],
"type": "function"
},
{
"anonymous": false,
"inputs": [
{"indexed": true, "name": "from", "type": "address"},
{"indexed": true, "name": "to", "type": "address"},
{"indexed": false,"name": "value","type": "uint256"}
],
"name": "Transfer",
"type": "event"
}
]
`
type ETHNode struct {
decodeKey string // 私钥解密密钥,从程序启动命令行获得
NetId *big.Int // 网络ID主网为1其他ID可通过rpc.NetworkID方法获取
Config message.ETHConfig // 配置文件
WsClient *ethclient.Client
RpcClient *ethclient.Client
Db db.MySQLPool
mu sync.Mutex
ListenAddresses sync.Map // key:"钱包地址", value:bool
UnConfirmTxs map[string]message.Tx_msg // 待交易地址池key:"交易hash", value: message.Tx
USDT *USDT // ETH相关
RmqMsgs map[string][]any // 根据地址查找出该地址涉及的消息,消息需要断言(topupreq_msg, payreq_msg, withdrawreq_msg)
Ctx context.Context
Cancel context.CancelFunc
}
type USDT struct {
Address common.Address // USDT合约地址
ABI abi.ABI // USDT ABI
TransferSig common.Hash // USDT函数签名
LogsChan chan types.Log
}
func NewETHNode(cfg message.ETHConfig, decodeKey string) (*ETHNode, error) {
// 连入ETH节点的ws
ws_client, err := ethclient.Dial(cfg.WsURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to Ethereum node: %w", err)
}
// 连入ETH节点的rpc
rpc_client, err := ethclient.Dial(cfg.RpcURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to Ethereum node rpc: %w", err)
}
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
// 获得net_id
netId, err := rpc_client.NetworkID(ctx)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to connect to get node net_id: %w", err)
}
// 构造USDT合约相关
usdt := &USDT{}
usdt.Address = common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7") // 解析合约地址
usdt.ABI = func() abi.ABI { a, _ := abi.JSON(strings.NewReader(erc20ABI)); return a }() // 解析合约ABI
usdt.TransferSig = crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")) // 解析合约transfer函数签名
usdt.LogsChan = make(chan types.Log, 1000) // 初始化合约日志通道
// 初始化数据库
dbConn, err := db.NewMySQLPool(cfg.DbConfig)
if err != nil {
cancel()
return nil, fmt.Errorf("mysql connect error: %w", err)
}
return &ETHNode{
decodeKey: decodeKey,
NetId: netId,
Config: cfg,
WsClient: ws_client,
RpcClient: rpc_client,
Db: *dbConn,
ListenAddresses: sync.Map{},
UnConfirmTxs: make(map[string]message.Tx_msg),
USDT: usdt,
RmqMsgs: make(map[string][]any),
Ctx: ctx,
Cancel: cancel,
}, nil
}
// ============================ 抽象接口 ============================
func (e *ETHNode) AddAddress(address string, rmq_msg any) {
// 统一转换为小写
address = strings.ToLower(address)
log.Printf("新增钱包监听消息:%v", rmq_msg)
e.ListenAddresses.Store(address, true)
e.mu.Lock()
if len(e.RmqMsgs[address]) == 0 {
e.RmqMsgs[address] = []any{rmq_msg}
} else {
e.RmqMsgs[address] = append(e.RmqMsgs[address], rmq_msg)
}
e.mu.Unlock()
}
func (e *ETHNode) RemoveAddress(address string) {
// 统一转换为小写
address = strings.ToLower(address)
e.ListenAddresses.Delete(address)
e.mu.Lock()
delete(e.RmqMsgs, address)
e.mu.Unlock()
}
func (e *ETHNode) Listen(symbol string, ch chan any) {
// 启动新区块监听(用于触发交易确认检查)
go e.listenNewBlocks("USDT", ch)
switch symbol {
case "USDT":
// 启动 USDT Transfer 事件监听
err := e.listen_usdt(ch)
if err != nil {
log.Fatal("Listen USDT Transactions Error:", err)
}
}
}
func (e *ETHNode) Transfer(symbol string, msg any) error {
switch symbol {
case "USDT":
err := e.usdt_transfer(msg)
if err != nil {
return fmt.Errorf("%s transfer ERROR: %w", symbol, err)
}
default:
return fmt.Errorf("unsupported symbol: %s", symbol)
}
return nil
}
// ============================ rpc节点方法 ============================
func (e *ETHNode) getETHBlance(address string) (*big.Int, error) {
account := common.HexToAddress(address)
ctx := context.Background()
balance, err := e.RpcClient.BalanceAt(ctx, account, nil) // nil表示最新高度
if err != nil {
return nil, fmt.Errorf("failed to get eth balance:%w", err)
}
// fBalance := new(big.Float).SetInt(balance)
// ethValue := new(big.Float).Quo(fBalance, big.NewFloat(1e18)) // 转 ETH
// value, _ := ethValue.Float64() // 转 float64
return balance, nil
}
func (e *ETHNode) getUSDTBalance(address string) (float64, error) {
// 统一转换为小写common.HexToAddress会自动处理但为了一致性显式转换
address = strings.ToLower(address)
contractAddress := e.USDT.Address
accountAddress := common.HexToAddress(address)
data, err := e.USDT.ABI.Pack("balanceOf", accountAddress)
if err != nil {
return 0, fmt.Errorf("failed to pack balanceOf data: %w", err)
}
msg := ethereum.CallMsg{
To: &contractAddress,
Data: data,
}
// 使用 CallContract 方法查询合约余额
res, err := e.RpcClient.CallContract(e.Ctx, msg, nil)
if err != nil {
return 0, fmt.Errorf("failed to get contract balance: %w", err)
}
// 解析返回的字节为 *big.Int
outputs, err := e.USDT.ABI.Unpack("balanceOf", res)
if err != nil || len(outputs) == 0 {
return 0, fmt.Errorf("failed to unpack balanceOf result: %w", err)
}
balance, ok := outputs[0].(*big.Int)
if !ok {
return 0, fmt.Errorf("unexpected type for balanceOf result")
}
bal := utils.BigIntUSDTToFloat64(balance)
return bal, nil
}
func (e *ETHNode) getBlockHeight() (uint64, error) {
header, err := e.RpcClient.HeaderByNumber(e.Ctx, nil)
if err != nil {
return 0, fmt.Errorf("failed to get latest block header: %w", err)
}
return header.Number.Uint64(), nil
}
func (e *ETHNode) getSuggestGasPrice() (*big.Int, error) {
ctx := context.Background()
gasPrice, err := e.RpcClient.SuggestGasPrice(ctx)
if err != nil {
return nil, fmt.Errorf("get suggest-gasprice error:%v", err)
}
return gasPrice, nil
}
// ============================ 业务方法 ============================
func (e *ETHNode) listen_usdt(ch chan any) error {
fmt.Println("🔍 ETH 开始监听 USDT Transfer 事件...")
// 过滤掉非USDT数据
query := ethereum.FilterQuery{
Addresses: []common.Address{e.USDT.Address},
}
// 负责重连
for {
// 订阅日志
sub, err := e.WsClient.SubscribeFilterLogs(e.Ctx, query, e.USDT.LogsChan)
if err != nil {
fmt.Println("❌ 订阅失败, 5秒后重试:", err)
time.Sleep(5 * time.Second)
continue
}
fmt.Println("✅ 订阅成功")
// 处理事件
for {
select {
case err := <-sub.Err():
fmt.Println("⚠️ 订阅异常,准备重连:", err)
sub.Unsubscribe() // 清理旧订阅
time.Sleep(3 * time.Second)
goto reconnect // 跳出内层循环,回到外层重新订阅
case vLog := <-e.USDT.LogsChan:
e.handleUSDTEvent(vLog, ch) // 事件解析 + 分类传递链消息的通道是vLog而非ch且一次只传递一笔交易
case <-e.Ctx.Done():
fmt.Println("🛑 收到停止信号,退出监听")
sub.Unsubscribe()
return e.Ctx.Err()
}
}
reconnect:
}
}
func (e *ETHNode) handleUSDTEvent(vLog types.Log, ch chan any) {
from := common.HexToAddress(vLog.Topics[1].Hex())
to := common.HexToAddress(vLog.Topics[2].Hex())
height := vLog.BlockNumber
fromAddr := strings.ToLower(from.Hex())
toAddr := strings.ToLower(to.Hex())
var transferEvent struct{ Value *big.Int }
if err := e.USDT.ABI.UnpackIntoInterface(&transferEvent, "Transfer", vLog.Data); err != nil {
fmt.Println("ABI 解析错误:", err)
return
}
// 先验证toAddr是否在监听列表里面
_, ok := e.ListenAddresses.Load(toAddr)
if !ok {
return
}
tx_hash := vLog.TxHash.Hex()
// tx, tx_ok := e.UnConfirmTxs[tx_hash]
// if tx_ok {
// // 【支付/提现】待确认交易中存在该交易Hash说明是我们主动发起的交易
// // 直接走确认流程,不发送待确认消息
// // log.Printf("🔍 检测到已发起的交易: TxHash=%s, Type=%d", tx_hash, tx.TxType)
// e.confirm("USDT", height, tx, ch)
// } else {
// 【充值】待确认交易中不存在该交易hash说明是外部转账
// 添加至待确认交易中,并立即发送待确认消息
// 1,先根据to查询RmqMsgs再根据存在的rmq_msg中的相关数据存入待确认交易
value, rmq_msg_ok := e.RmqMsgs[toAddr]
var tx_type int
if rmq_msg_ok {
for _, v := range value {
_, ok := v.(message.TopupMsg_req)
if ok {
tx_type = 0
}
_, ok1 := v.(message.WithdrawMsg_req)
if ok1 {
tx_type = 1
}
_, ok2 := v.(message.PayMsg_req)
if ok2 {
tx_type = 2
}
}
}
e.UnConfirmTxs[tx_hash] = message.Tx_msg{
TxType: tx_type,
Tx: message.Tx{
From: fromAddr,
To: toAddr,
Height: height,
TxHash: tx_hash,
Symbol: "USDT",
Value: utils.BigIntUSDTToFloat64(transferEvent.Value),
Status: 2, // 待确认状态
},
}
// log.Printf("📝 待确认交易新增: TxHash=%s, Height=%d, To=%s, Type=%d", tx_hash, height, toAddr, tx_type)
// 🔔 【仅充值】立即发送待确认状态的消息(支付/提现不发送待确认消息)
if tx_type == 0 && rmq_msg_ok {
for _, v := range value {
d1, ok := v.(message.TopupMsg_req)
if ok && strings.ToLower(d1.Address) == toAddr {
pendingMsg := message.TopupMsg_resp{
Address: toAddr,
Status: 2, // 待确认状态
Chain: d1.Chain,
Symbol: d1.Symbol,
Amount: utils.BigIntUSDTToFloat64(transferEvent.Value),
TxHash: tx_hash,
BlockHeight: height,
}
// log.Printf("📤 发送待确认充值消息: TxHash=%s, Address=%s, Amount=%.2f",
// tx_hash, toAddr, pendingMsg.Amount)
// 异步发送,避免阻塞事件处理
go func(msg message.TopupMsg_resp) {
select {
case ch <- msg:
log.Printf("✅ 待确认充值消息已发送")
default:
log.Printf("⚠️ 通道阻塞,待确认消息发送失败")
}
}(pendingMsg)
break
}
}
// }
}
}
// listenNewBlocks 监听新区块产生,触发交易确认检查
func (e *ETHNode) listenNewBlocks(symbol string, ch chan any) {
fmt.Println("🔍 开始监听新区块...")
headers := make(chan *types.Header, 10)
// 负责重连
for {
// 订阅新区块头
sub, err := e.WsClient.SubscribeNewHead(e.Ctx, headers)
if err != nil {
fmt.Println("❌ 订阅新区块失败, 5秒后重试:", err)
time.Sleep(5 * time.Second)
continue
}
fmt.Println("✅ 新区块订阅成功")
// 处理新区块
for {
select {
case err := <-sub.Err():
fmt.Println("⚠️ 新区块订阅异常,准备重连:", err)
sub.Unsubscribe()
time.Sleep(3 * time.Second)
goto reconnect
case header := <-headers:
// 每当有新区块,检查待确认交易
currentHeight := header.Number.Uint64()
// log.Printf("🆕 新区块: %d", currentHeight)
// 检查是否有待确认交易
e.mu.Lock()
hasPendingTx := len(e.UnConfirmTxs) > 0
e.mu.Unlock()
if hasPendingTx {
e.checkAndConfirmTransactions(symbol, currentHeight, ch)
}
case <-e.Ctx.Done():
fmt.Println("🛑 停止新区块监听")
sub.Unsubscribe()
return
}
}
reconnect:
}
}
// checkAndConfirmTransactions 检查并确认达到确认高度的交易
func (e *ETHNode) checkAndConfirmTransactions(symbol string, currentHeight uint64, ch chan any) {
e.mu.Lock()
needConfirmList := []message.Tx_msg{}
for _, tx := range e.UnConfirmTxs {
// 检查是否达到确认高度
if currentHeight >= tx.Tx.Height+e.Config.ConfirmHeight {
log.Printf("当前高度=%d, 交易高度=%d", currentHeight, tx.Tx.Height)
// log.Printf("✅ 交易达到确认高度: TxHash=%s, 当前高度=%d, 交易高度=%d, 需确认=%d块",
// txHash, currentHeight, tx.Tx.Height, e.Config.ConfirmHeight)
needConfirmList = append(needConfirmList, tx)
}
}
e.mu.Unlock()
// 批量触发确认(在锁外执行,避免长时间持锁)
for _, tx := range needConfirmList {
e.confirm(symbol, currentHeight, tx, ch)
}
}
func (e *ETHNode) confirm(symbol string, height uint64, tx message.Tx_msg, ch chan any) {
switch symbol {
case "USDT":
e.confirm_usdt(tx, height, ch)
}
}
func (e *ETHNode) confirm_usdt(tx message.Tx_msg, height uint64, ch chan any) {
// 先从 UnConfirmTxs 中读取
e.mu.Lock()
unConfirmTx, ok := e.UnConfirmTxs[tx.Tx.TxHash]
e.mu.Unlock()
if !ok {
return
}
if height < unConfirmTx.Tx.Height {
return
}
// 超过确认高度,查询交易数据
txHash := common.HexToHash(tx.Tx.TxHash)
receipt, err := e.RpcClient.TransactionReceipt(e.Ctx, txHash)
var status int
if err != nil {
log.Println("⚠️ 查询交易收据失败 TxHash=", txHash, err)
status = 0
} else if receipt.Status == types.ReceiptStatusSuccessful {
status = 1
} else {
status = 0
}
tx.Tx.Status = status
// 通过通道发送,异步写避免阻塞
go func(tx message.Tx_msg) {
e.mu.Lock()
var result_msg any
var matchIndex = -1 // 记录匹配的索引
rmq_msg := e.RmqMsgs[tx.Tx.To]
for i, v := range rmq_msg {
// 处理充值
d1, ok := v.(message.TopupMsg_req)
if ok {
// 统一转小写比较
if strings.ToLower(d1.Address) == tx.Tx.To {
result_msg = message.TopupMsg_resp{
Address: tx.Tx.To,
Status: tx.Tx.Status,
Chain: d1.Chain,
Symbol: d1.Symbol,
Amount: tx.Tx.Value,
TxHash: tx.Tx.TxHash,
BlockHeight: tx.Tx.Height,
}
// 充值消息不删除,可能会有多笔充值到同一地址
break
}
}
// 处理提现
d2, ok1 := v.(message.WithdrawMsg_req)
if ok1 {
// 统一转小写比较
if strings.ToLower(d2.FromAddress) == tx.Tx.From &&
strings.ToLower(d2.ToAddress) == tx.Tx.To &&
d2.Amount == tx.Tx.Value {
result_msg = message.WithdrawMsg_resp{
QueueId: d2.QueueId,
Status: tx.Tx.Status,
Amount: tx.Tx.Value,
Chain: d2.Chain,
Symbol: d2.Symbol,
TxHash: tx.Tx.TxHash,
FromAddress: tx.Tx.From,
ToAddress: tx.Tx.To,
BlockHeight: tx.Tx.Height,
}
matchIndex = i // 记录索引,稍后删除
break
}
}
// 处理支付
d3, ok2 := v.(message.PayMsg_req)
if ok2 {
// 统一转小写比较
if strings.ToLower(d3.FromAddress) == tx.Tx.From &&
strings.ToLower(d3.ToAddress) == tx.Tx.To &&
d3.Amount == tx.Tx.Value {
result_msg = message.PayMsg_resp{
QueueId: d3.QueueId,
Status: tx.Tx.Status,
Amount: tx.Tx.Value,
Chain: d3.Chain,
Symbol: d3.Symbol,
OrderId: d3.OrderId,
TxHash: tx.Tx.TxHash,
FromAddress: tx.Tx.From,
ToAddress: tx.Tx.To,
BlockHeight: tx.Tx.Height,
}
matchIndex = i // 记录索引,稍后删除
break
}
}
}
// 循环结束后,统一删除匹配的消息(提现和支付需要删除)
if matchIndex >= 0 {
e.RmqMsgs[tx.Tx.To] = utils.Slice_delete(e.RmqMsgs[tx.Tx.To], matchIndex)
}
e.mu.Unlock()
select {
case ch <- result_msg:
default:
fmt.Println("⚠️ confirm通道阻塞消息丢失:", txHash)
}
}(tx)
// 删除已确认交易
e.mu.Lock()
delete(e.UnConfirmTxs, tx.Tx.TxHash)
e.mu.Unlock()
}
func (e *ETHNode) decodePrivatekey(address string) string {
// 统一转换为小写
address = strings.ToLower(address)
// 查询加密后的私钥
querySql := "SELECT `private_key` FROM eth_balance WHERE address = ? LIMIT 1;"
log.Println("查询私钥的钱包地址:", address)
var encryptedKey string
err := e.Db.QueryRow(querySql, address).Scan(&encryptedKey)
if err != nil {
log.Println("❌ 查询私钥失败:", err)
return ""
}
// 使用key解密
privateKey := encryptedKey // 实际使用时替换成具体的解密代码
// fmt.Println(privateKey)
return privateKey
}
func (e *ETHNode) usdt_transfer(msg any) error {
var user_from, final_from, to string
var amount float64
// var tx_type int
// now_height, err := e.getBlockHeight()
// if err != nil {
// return fmt.Errorf("get lastest height error: %v", err)
// }
// ---------------------------------------------------------------------------------------------
// 断言,确定本次转账是哪个类型
// 支付操作
v, ok := msg.(message.PayMsg_req)
if ok {
e.AddAddress(v.ToAddress, v) // 存入该笔msgAddAddress内部会转小写
// 统一转换为小写
user_from, final_from, to, amount = strings.ToLower(v.FromAddress), strings.ToLower(v.FromAddress), strings.ToLower(v.ToAddress), v.Amount
// tx_type = 2
}
// 提现操作
k, ok1 := msg.(message.WithdrawMsg_req)
if ok1 {
e.AddAddress(k.ToAddress, k) // 存入该笔msgAddAddress内部会转小写
// 统一转换为小写
user_from, final_from, to, amount = strings.ToLower(k.FromAddress), strings.ToLower(k.FromAddress), strings.ToLower(k.ToAddress), k.Amount
// tx_type = 1
}
// ---------------------------------------------------------------------------------------------
// 1,校验钱包余额
balance, err := e.getUSDTBalance(user_from)
log.Printf("检测Transfer钱包=%s余额=%f", user_from, balance)
if err != nil {
return fmt.Errorf("failed to get balance: %w", err)
}
// 2,钱包余额不足,调用归集钱包转账
if balance < amount {
final_from = "归集钱包"
}
// 3,通过from地址前往数据库查找出对应加密后的私钥并解密真实的私钥
originalKey := e.decodePrivatekey(final_from)
if originalKey == "" {
return fmt.Errorf("failed to query privatekey")
}
fmt.Println(originalKey)
privateKey, err := crypto.HexToECDSA(originalKey)
if err != nil {
return fmt.Errorf("failed to parse private key: %w", err)
}
// 4, 获得nonce
nonce, err := e.RpcClient.PendingNonceAt(e.Ctx, common.HexToAddress(final_from))
if err != nil {
return fmt.Errorf("failed to get nonce: %w", err)
}
// 5, 构造交易ERC20 transfer 调用)
amountBigInt := utils.Float64ToBigIntUSDT(amount)
data, err := e.USDT.ABI.Pack("transfer", common.HexToAddress(to), amountBigInt) // 打包 transfer(address,uint256) 方法调用
if err != nil {
return fmt.Errorf("failed to pack transfer data: %w", err)
}
gasPrice, err := e.getSuggestGasPrice() // 获得当前建议gasPrice
if err != nil {
return fmt.Errorf("get suggest-gasprice error:%v", err)
}
eth_balance, err := e.getETHBlance(final_from) // 获得钱包eth余额
if err != nil {
return fmt.Errorf("%w", err)
}
var gasLimit uint64 = 100000
gasLimit_b := new(big.Int).SetUint64(gasLimit)
gas := new(big.Int).Mul(gasLimit_b, gasPrice)
// 判断钱包eth是否支持本次交易gas费用
if eth_balance.Cmp(gas) == -1 {
return fmt.Errorf("address=%s balance less than gas=%v(wei)", final_from, eth_balance)
}
// 构造发送到 USDT 合约地址的交易
tx := types.NewTransaction(
nonce,
e.USDT.Address, // 发送到USDT合约地址
big.NewInt(0), // value为0ERC20转账不需要ETH
gasLimit, // GasLimit设置为100000ERC20转账需要更多gas
gasPrice, // GasPrice: 20 Gwei
data, // 附加数据transfer方法调用
)
// 6, 签名交易并获得txHash
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(e.NetId), privateKey)
// txHash := signedTx.Hash().Hex() // 通过签名信息解析出交易hash
if err != nil {
return fmt.Errorf("failed to sign transaction: %w", err)
}
// 7, 发送交易
err = e.RpcClient.SendTransaction(e.Ctx, signedTx)
if err != nil {
return fmt.Errorf("failed to send transaction: %w", err)
}
// // 8, 构造交易消息
// tx_msg := message.Tx_msg{
// TxType: tx_type,
// Tx: message.Tx{
// From: final_from,
// To: to,
// Height: now_height,
// TxHash: txHash,
// Symbol: "USDT",
// Value: amount,
// Status: 2,
// },
// }
// // 9, 将构造的交易消息存入待确认交易中
// e.UnConfirmTxs[txHash] = tx_msg
return nil
}
func (e *ETHNode) Stop() {
e.Cancel()
}