modify some msg-struct and table-struct

This commit is contained in:
lzx
2025-11-14 17:43:25 +08:00
parent 245c9c94cb
commit ac22db02f3
11 changed files with 605 additions and 276 deletions

View File

@@ -144,7 +144,11 @@ func NewETHNode(cfg message.Config, decodeKey string, l *listen.ListenServer) (*
Ctx: ctx,
Cancel: cancel,
}
// 初始化表
err = ethnode.initTables()
if err != nil {
return nil, err
}
// 更新网络公共数据和加载钱包
height, err := ethnode.getHeight()
if err != nil {
@@ -161,6 +165,7 @@ func NewETHNode(cfg message.Config, decodeKey string, l *listen.ListenServer) (*
return nil, fmt.Errorf("load unconfirmtxs error: %v", err)
}
log.Println("✅ ETH节点已启动")
// ethnode.handleHistoryETHEvent(23795552)
return ethnode, nil
}

View File

@@ -9,6 +9,7 @@ import (
message "m2pool-payment/internal/msg"
"m2pool-payment/internal/utils"
"math/big"
"os"
"strings"
"sync"
"time"
@@ -31,6 +32,21 @@ func init_USDT() *USDT {
return usdt
}
func (e *ETHNode) initTables() error {
sql_script_path := "../public/eth.sql"
sql_byte, err := os.ReadFile(sql_script_path)
if err != nil {
return fmt.Errorf("open msg-sql file error: %v", err)
}
err = e.SqliteDB.CreateTable(string(sql_byte))
if err != nil {
return fmt.Errorf("exec eth-sql error: %v", err)
}
return nil
}
func (e *ETHNode) updateNetInfo(height uint64) {
// 创建 WaitGroup
var wg sync.WaitGroup
@@ -116,11 +132,11 @@ func (e *ETHNode) getSuggestGasPrice() (*big.Int, error) {
}
// 设置gas price上限避免在网络拥堵时费用过高
// 这里设置为20 Gwei (20 * 10^9 wei)
maxGasPrice := new(big.Int).SetUint64(20000000000) // 20 Gwei
// 这里设置为20 Gwei (2 * 10^9 wei)
maxGasPrice := new(big.Int).SetUint64(2000000000) // 2 Gwei
if gasPrice.Cmp(maxGasPrice) > 0 {
log.Printf("⚠️ 建议gas price过高 (%v wei),使用上限 20 Gwei", new(big.Int).Div(gasPrice, big.NewInt(1e18)))
log.Printf("⚠️ 建议gas price过高 (%v wei),使用上限 2 Gwei", new(big.Int).Div(gasPrice, big.NewInt(1e18)))
return maxGasPrice, nil
}
@@ -144,15 +160,15 @@ func (e *ETHNode) getEIP1559GasFees() (*big.Int, *big.Int, error) {
}
// 设置优先级费用tip这里设置为2 Gwei
maxPriorityFeePerGas := new(big.Int).SetUint64(2000000000) // 2 Gwei
maxPriorityFeePerGas := new(big.Int).SetUint64(2000000) // 0.2 Gwei
// 计算最大费用 = 基础费用 + 优先级费用
maxFeePerGas := new(big.Int).Add(baseFee, maxPriorityFeePerGas)
// 设置最大费用上限为30 Gwei
maxFeeLimit := new(big.Int).SetUint64(30000000000) // 30 Gwei
// 设置最大费用上限为2 Gwei
maxFeeLimit := new(big.Int).SetUint64(2000000000) // 2 Gwei
if maxFeePerGas.Cmp(maxFeeLimit) > 0 {
log.Printf("⚠️ 计算的最大费用过高 (%v wei),使用上限 30 Gwei", new(big.Int).Div(maxFeePerGas, big.NewInt(1e18)))
log.Printf("⚠️ 计算的最大费用过高 (%v wei),使用上限 2 Gwei", new(big.Int).Div(maxFeePerGas, big.NewInt(1e18)))
maxFeePerGas = maxFeeLimit
}
@@ -387,10 +403,11 @@ func (e *ETHNode) loadWallets() error {
addresses = append(addresses, addr)
wallets[addr] = wallet
}
log.Printf("ETH钱包加载成功%v", addresses)
if err := rows.Err(); err != nil {
return fmt.Errorf("error occurred while iterating rows: %v", err)
}
if len(addresses) > 0 {
pks, err := e.getAddressesPks(addresses)
if err != nil {
@@ -404,6 +421,10 @@ func (e *ETHNode) loadWallets() error {
e.mu.Unlock()
}
// for addr, balance := range e.Wallets {
// log.Printf("钱包:%s, wallets: %v", addr, balance)
// }
return nil
}
@@ -634,8 +655,50 @@ func (e *ETHNode) listenETHTransactions() error {
}
}
func (e *ETHNode) handleHistoryETHEvent(height uint64) {
block, err := e.RpcClient.BlockByNumber(e.Ctx, big.NewInt(int64(height)))
if err != nil {
log.Println(err)
return
}
for _, tx := range block.Transactions() {
txHash := tx.Hash().Hex()
// 只处理ETH转账Value > 0
if tx.Value().Sign() <= 0 {
continue
}
// 使用 types.Sender 获取发送方地址
signer := types.LatestSignerForChainID(e.NetID)
from, err := types.Sender(signer, tx)
if err != nil {
log.Println("获取发送方地址失败:", err)
continue
}
toAddr := ""
if tx.To() != nil {
toAddr = strings.ToLower(tx.To().Hex())
}
fromAddr := strings.ToLower(from.Hex())
// 获取交易金额
amount := utils.BigIntETHToFloat64(tx.Value())
if _, ok := e.Wallets[toAddr]; ok {
msg, err := e.MessageServer.FindTopupMsgWithToaddress("ETH", toAddr)
if err != nil {
log.Printf("❌ 未查找到ETH充值消息中有address(%s)信息", toAddr)
continue
}
log.Printf("当前交易txHash:%s, fromAddr: %s, toAddr: %s, amount: %fETH", txHash, fromAddr, toAddr, amount)
log.Println(msg)
}
}
}
func (e *ETHNode) handleETHEvent(header *types.Header) {
height := header.Number.Uint64()
log.Printf("当前区块高度:%d", height)
// 获取区块中的所有交易
block, err := e.RpcClient.BlockByHash(e.Ctx, header.Hash())
if err != nil {
@@ -666,6 +729,7 @@ func (e *ETHNode) handleETHEvent(header *types.Header) {
amount := utils.BigIntETHToFloat64(tx.Value())
// toAddr和监听钱包一致表示(充值)
if _, ok := e.Wallets[toAddr]; ok {
log.Printf("监听地址: %s交易信息%v", toAddr, tx)
msg, err := e.MessageServer.FindTopupMsgWithToaddress("ETH", toAddr)
if err != nil {
log.Printf("❌ 未查找到ETH充值消息中有address(%s)信息", toAddr)
@@ -719,7 +783,7 @@ func (e *ETHNode) handleETHEvent(header *types.Header) {
resp := message.WithdrawMsg_resp{
QueueId: v.QueueId,
Chain: "ETH",
Symbol: v.Symbol, // 使用消息中的Symbol应该是ETH
Symbol: "ETH", // 使用消息中的Symbol应该是ETH
FromAddress: fromAddr,
ToAddress: toAddr,
TxHash: txHash,
@@ -734,7 +798,7 @@ func (e *ETHNode) handleETHEvent(header *types.Header) {
QueueId: v.QueueId,
TxType: 1, // 提现类型
Chain: "ETH",
Symbol: v.Symbol, // 使用消息中的Symbol应该是ETH
Symbol: "ETH", // 使用消息中的Symbol应该是ETH
From: fromAddr,
To: toAddr,
TxHash: txHash,
@@ -746,30 +810,36 @@ func (e *ETHNode) handleETHEvent(header *types.Header) {
}
case message.PayMsg_req:
for to, transaction := range v.Transactions {
if v.FromAddress == fromAddr && to == toAddr && transaction.Amount == amount {
resp := transaction
resp.Chain = "ETH"
resp.Symbol = v.Symbol // 使用消息中的Symbol可能是ETH或USDT
resp.Status = constant.STATUS_PENDING
go e.asyncSendMsgToListen(resp, 3, 5*time.Second)
e.UnConfirmedTxs.mu.Lock()
e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{
QueueId: v.QueueId,
TxType: 2, // 支付类型
Chain: "ETH",
Symbol: v.Symbol, // 使用消息中的Symbol
From: fromAddr,
To: toAddr,
TxHash: txHash,
Height: height,
Amount: tx.Value(),
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
if v.FromAddress == fromAddr && v.ToAddress == toAddr && v.Amount == amount {
resp := message.PayMsg_resp{
QueueId: v.QueueId,
Chain: "ETH",
Symbol: "ETH", // 使用消息中的Symbol应该是ETH
FromAddress: fromAddr,
ToAddress: toAddr,
TxHash: txHash,
Amount: amount,
Fee: v.Fee,
BlockHeight: height,
Status: constant.STATUS_PENDING,
}
go e.asyncSendMsgToListen(resp, 3, 5*time.Second)
e.UnConfirmedTxs.mu.Lock()
e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{
QueueId: v.QueueId,
TxType: 2, // 提现类型
Chain: "ETH",
Symbol: "ETH", // 使用消息中的Symbol应该是ETH
From: fromAddr,
To: toAddr,
TxHash: txHash,
Height: height,
Amount: tx.Value(),
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
}
default:
}
@@ -897,7 +967,7 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) {
QueueId: v.QueueId,
TxType: 1, // 提现类型
Chain: "ETH",
Symbol: v.Symbol, // 使用消息中的Symbol应该是USDT
Symbol: "USDT", // 使用消息中的Symbol应该是USDT
From: fromAddr,
To: toAddr,
TxHash: txHash,
@@ -909,29 +979,34 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log) {
}
case message.PayMsg_req:
for to, transaction := range v.Transactions {
if v.FromAddress == fromAddr && to == toAddr && transaction.Amount == value_float {
resp := transaction
resp.Chain = "ETH"
resp.Symbol = v.Symbol // 使用消息中的Symbol应该是USDT
resp.Status = constant.STATUS_PENDING
go e.asyncSendMsgToListen(resp, 3, 5*time.Second)
e.UnConfirmedTxs.mu.Lock()
e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{
QueueId: v.QueueId,
TxType: 2, // 支付类型
Chain: "ETH",
Symbol: v.Symbol, // 使用消息中的Symbol应该是USDT
From: fromAddr,
To: toAddr,
TxHash: txHash,
Height: height,
Amount: transferEvent.Value,
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
if v.FromAddress == fromAddr && v.ToAddress == toAddr && v.Amount == value_float {
resp := message.PayMsg_resp{
QueueId: v.QueueId,
Chain: "ETH",
Symbol: "USDT",
FromAddress: fromAddr,
ToAddress: toAddr,
TxHash: txHash,
Amount: value_float,
Fee: v.Fee,
BlockHeight: height,
Status: constant.STATUS_PENDING,
}
go e.asyncSendMsgToListen(resp, 3, 5*time.Second)
e.UnConfirmedTxs.mu.Lock()
e.UnConfirmedTxs.Transactions[txHash] = message.Transaction{
QueueId: v.QueueId,
TxType: 1, // 提现类型
Chain: "ETH",
Symbol: "USDT", // 使用消息中的Symbol应该是USDT
From: fromAddr,
To: toAddr,
TxHash: txHash,
Height: height,
Amount: transferEvent.Value,
Status: constant.STATUS_PENDING,
}
e.UnConfirmedTxs.mu.Unlock()
}
default:
@@ -986,7 +1061,10 @@ func (e *ETHNode) confirm() {
delete(e.UnConfirmedTxs.Transactions, txHash)
continue
}
var tableMap = map[string]string{
"ETH": "ETH_balances",
"USDT": "USDT_balances",
}
switch v := msg.(type) {
case message.TopupMsg_req:
if status == constant.STATUS_SUCCESS {
@@ -1013,6 +1091,18 @@ func (e *ETHNode) confirm() {
TxHash: txHash,
BlockHeight: tx.Height,
}
go func() {
str := "UPDATE " + tableMap[tx.Symbol] + " SET success_tx_hash = success_tx_hash || ? WHERE address = ?"
params := []any{txHash + ",", v.Address}
count, err := e.SqliteDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", v.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", v.QueueId, count)
}
}()
responses = append(responses, response) // 将消息提交至responses
case message.WithdrawMsg_req:
if status == constant.STATUS_SUCCESS {
@@ -1060,17 +1150,19 @@ func (e *ETHNode) confirm() {
delete(e.UnConfirmedTxs.Transactions, txHash)
continue
}
response := message.PayData{
QueueId: v.QueueId,
Chain: v.Chain,
Symbol: v.Symbol,
TxHash: txHash,
ToAddress: tx.To,
Amount: float_amount,
BlockHeight: tx.Height,
response := message.PayMsg_resp{
QueueId: tx.QueueId,
Chain: tx.Chain,
Symbol: tx.Symbol,
Status: status,
Amount: float_amount,
Fee: v.Fee,
TxHash: txHash,
FromAddress: tx.From,
ToAddress: tx.To,
BlockHeight: tx.Height,
}
responses = append(responses, response)
responses = append(responses, response) // 将消息提交至responses
default:
log.Printf("未知的消息类型: %v, 跳过此交易", v)
delete(e.UnConfirmedTxs.Transactions, txHash)
@@ -1137,6 +1229,11 @@ func (e *ETHNode) handleListen_Topup_req(msg message.TopupMsg_req) {
if err != nil {
log.Printf("Received ListenServer Topup_req msg: insert sqlite3 db error: %v", err)
}
go e.asyncSendMsgToListen(message.UpdateReqState{
QueueId: msg.QueueId,
MsgType: 0,
Status: constant.STATUS_SUCCESS,
}, 3, 5*time.Second)
}()
}
@@ -1193,6 +1290,11 @@ func (e *ETHNode) handleListen_Withdraw_req(msg message.WithdrawMsg_req) {
return
}
// 转账成功等待chain server listen监听到该笔交易
go e.asyncSendMsgToListen(message.UpdateReqState{
QueueId: msg.QueueId,
MsgType: 1,
Status: constant.STATUS_SUCCESS,
}, 3, 5*time.Second)
log.Printf("withdraw - transfer success: QueueId(%s)", msg.QueueId)
} else { // 校验失败
// 提现转账账户余额不足
@@ -1209,8 +1311,8 @@ func (e *ETHNode) handleListen_Pay_req(msg message.PayMsg_req) {
e.NetInfo.mu.Unlock()
var target_amount_eth, target_amount_usdt *big.Int
// 将 msg.Amount 和 msg.Fee 转换为 big.Int只调用一次
amountBigInt := utils.Float64ToBigInt(msg.Symbol, msg.TotalAmount)
feeBigInt := utils.Float64ToBigInt(msg.Symbol, msg.TotalFee)
amountBigInt := utils.Float64ToBigInt(msg.Symbol, msg.Amount)
feeBigInt := utils.Float64ToBigInt(msg.Symbol, msg.Fee)
switch msg.Symbol {
case "ETH":
// 计算目标金额
@@ -1223,41 +1325,46 @@ func (e *ETHNode) handleListen_Pay_req(msg message.PayMsg_req) {
default:
return
}
// 构建相应通用数据Status根据后续情况变化
result_msg := message.PayMsg_resp{
QueueId: msg.QueueId,
Chain: msg.Chain,
Symbol: msg.Symbol,
FromAddress: msg.FromAddress,
Transactions: msg.Transactions,
QueueId: msg.QueueId,
Chain: msg.Chain,
Symbol: msg.Symbol,
Amount: msg.Amount,
Fee: msg.Fee,
FromAddress: msg.FromAddress,
ToAddress: msg.ToAddress,
}
check_result, err := e.checkBalance(msg.Symbol, msg.FromAddress, target_amount_eth, target_amount_usdt)
// 余额校验错误,绕过转账,返回错误响应
if err != nil {
log.Printf("check balance error: %v", err)
result_msg.PayStatus = constant.STATUS_ERROR
for to, tx := range result_msg.Transactions {
tx.Status = constant.STATUS_ERROR
result_msg.Transactions[to] = tx
}
result_msg.Status = constant.STATUS_ERROR
go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second)
return
}
// 校验成功
if check_result {
for _, tx := range result_msg.Transactions {
err := e.Transfer(msg.FromAddress, tx.ToAddress, msg.Symbol, tx.Amount, tx.Fee)
if err != nil {
log.Println(err)
tx.Status = constant.STATUS_ERROR
} else {
tx.Status = constant.STATUS_PENDING
}
// 开始转账
err := e.Transfer(msg.FromAddress, msg.ToAddress, msg.Symbol, msg.Amount, msg.Fee)
// 转账失败,返回转账失败结果
if err != nil {
log.Printf("withdraw - transfer error: %v", err)
result_msg.Status = constant.STATUS_ERROR
// 提现转账错误
go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second)
return
}
// 此时所有待转账的数据均已进行转账处理
// 转账成功等待chain server listen监听到该笔交易
log.Printf("pay - transfer success: QueueId(%s)", msg.QueueId)
} else {
go e.asyncSendMsgToListen(message.UpdateReqState{
QueueId: msg.QueueId,
MsgType: 1,
Status: constant.STATUS_SUCCESS,
}, 3, 5*time.Second)
log.Printf("withdraw - transfer success: QueueId(%s)", msg.QueueId)
} else { // 校验失败
// 提现转账账户余额不足
result_msg.PayStatus = constant.STATUS_BALANCE_NOT_ENOUGH
result_msg.Status = constant.STATUS_BALANCE_NOT_ENOUGH
go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second)
return
}
@@ -1276,7 +1383,7 @@ func (e *ETHNode) handleListen_Remove_req(msg message.RemoveListenMsg_req) {
}
str := "UPDATE ETH_wallets SET status = ? WHERE address = ?"
params := []any{0, msg.Address}
count, err := e.SqliteDB.Update(str, params...)
count, err := e.SqliteDB.Update(str, params)
if err != nil || count != 1 {
log.Printf("Remove address(%s) error: count(%d)", msg.Address, count)
// result_msg.Status = constant.STATUS_FAILED

View File

@@ -74,7 +74,7 @@ func (s *SQLite) Delete(sqlStr string, args ...any) (int64, error) {
}
// 更新数据
func (s *SQLite) Update(sqlStr string, args ...any) (int64, error) {
func (s *SQLite) Update(sqlStr string, args []any) (int64, error) {
res, err := s.DB.Exec(sqlStr, args...)
if err != nil {
return 0, fmt.Errorf("update error: %v", err)

View File

@@ -85,9 +85,7 @@ func NewListenServer(cfg message.Config) *ListenServer {
ch[net] = make(chan any, 1000)
chin[net] = make(chan any, 1000)
}
log.Println("✅ 消息监听处理已启动")
return &ListenServer{
var l = &ListenServer{
Config: cfg,
// MysqlDB: dbConn,
SqliteDB: sqlite,
@@ -100,6 +98,9 @@ func NewListenServer(cfg message.Config) *ListenServer {
ChFromRmqServer: rmq_ch_in,
ChToRmqServer: rmq_ch_out,
}
l.loadMsg()
log.Println("✅ 消息监听处理已启动")
return l
}
// rmq -> listen -> chain server
@@ -141,12 +142,15 @@ func (l *ListenServer) NetMsgIn() {
for _, ch := range l.ChFromChainServer {
for msg := range ch {
switch v := msg.(type) {
// 接收到区块链节点返回的更新req状态消息
case message.UpdateReqState:
go l.handleUpdateReqState(v)
// 接收到区块链节点服务返回的resp消息除了修改相关状态和数据库还需通过rmq_out通道返回出去
case message.TopupMsg_resp:
go l.handleChainTopup_resp(v)
case message.WithdrawMsg_resp:
go l.handleChainWithdraw_resp(v)
case message.PayData:
case message.PayMsg_resp:
go l.handleChainPay_resp(v)
case message.RemoveListenMsg_resp:
go l.handleChainRemove_resp(v)

View File

@@ -5,10 +5,162 @@ import (
"log"
"m2pool-payment/internal/constant"
message "m2pool-payment/internal/msg"
"strings"
"time"
)
// 加载所有消息
func (l *ListenServer) loadMsg() {
topup_sql := "SELECT queue_id, chain, symbol, address, timestamp, sign, status FROM topup_req_msg WHERE status = ?" // 1正在监听
withdraw_sql := "SELECT queue_id, chain, symbol, from_addr, to_addr, amount, fee, timestamp, sign, status FROM withdraw_req_msg WHERE status = ? OR status = ?" // 2待确认5待支付
pay_sql := "SELECT queue_id, chain, symbol, from_addr, to_addr, amount, fee, timestamp, sign, status FROM withdraw_req_msg WHERE status = ? OR status = ?" // 2待确认5待支付
remove_sql := "SELECT queue_id, msg_type, chain, symbol, address, timestamp, sign, status FROM remove_req_msg WHERE status = ?" // 2待执行
topup_params := []any{1}
withdraw_params := []any{2, 5}
pay_params := []any{2, 5}
remove_params := []any{2}
go func() {
rows, err := l.SqliteDB.Query_(topup_sql, topup_params...)
if err != nil {
log.Fatalf("load topup-msg error: %v", err)
return
}
// 遍历查询结果
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
address := row["address"].(string) // 假设 address 是 string 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 INTEGER 类型
// 处理数据,例如打印输出
l.TopupMsgs[chain].mu.RLock()
l.TopupMsgs[chain].Msgs[queueID] = message.TopupMsg_req{
QueueId: queueID,
Chain: chain,
Symbol: symbol,
Address: address,
Timestamp: uint64(timestamp),
Sign: sign,
Status: int(status),
}
l.TopupMsgs[chain].mu.RUnlock()
}
log.Printf("充值历史消息load完毕")
}()
go func() {
rows, err := l.SqliteDB.Query_(withdraw_sql, withdraw_params...)
if err != nil {
log.Fatalf("load withdraw-msg error: %v", err)
return
}
// 遍历查询结果
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型
toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型
amount := row["amount"].(float64) // 假设 amount 是 float64 类型
fee := row["fee"].(float64) // 假设 fee 是 float64 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 string 类型
// 处理数据,例如打印输出
l.WithdrawMsgs[chain].mu.RLock()
l.WithdrawMsgs[chain].Msgs[queueID] = message.WithdrawMsg_req{
QueueId: queueID,
Chain: chain,
Symbol: symbol,
FromAddress: fromAddr,
ToAddress: toAddr,
Amount: amount,
Fee: fee,
Timestamp: uint64(timestamp),
Sign: sign,
Status: int(status),
}
l.WithdrawMsgs[chain].mu.RUnlock()
}
log.Printf("提现历史消息load完毕")
}()
go func() {
rows, err := l.SqliteDB.Query_(pay_sql, pay_params...)
if err != nil {
log.Fatalf("load pay-msg error: %v", err)
return
}
// 遍历查询结果
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
fromAddr := row["from_addr"].(string) // 假设 from_addr 是 string 类型
toAddr := row["to_addr"].(string) // 假设 to_addr 是 string 类型
amount := row["amount"].(float64) // 假设 amount 是 float64 类型
fee := row["fee"].(float64) // 假设 fee 是 float64 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 string 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 string 类型
// 处理数据,例如打印输出
l.PayMsgs[chain].mu.RLock()
l.PayMsgs[chain].Msgs[queueID] = message.PayMsg_req{
QueueId: queueID,
Chain: chain,
Symbol: symbol,
FromAddress: fromAddr,
ToAddress: toAddr,
Amount: amount,
Fee: fee,
Timestamp: uint64(timestamp),
Sign: sign,
Status: int(status),
}
l.PayMsgs[chain].mu.RUnlock()
}
log.Printf("支付历史消息load完毕")
}()
go func() {
rows, err := l.SqliteDB.Query_(remove_sql, remove_params...)
if err != nil {
log.Fatalf("load remove-msg error: %v", err)
return
}
for _, row := range rows {
queueID := row["queue_id"].(string) // 假设 queue_id 是 string 类型
msgType := row["msg_type"].(int)
chain := row["chain"].(string) // 假设 chain 是 string 类型
symbol := row["symbol"].(string) // 假设 symbol 是 string 类型
address := row["address"].(string) // 假设 address 是 string 类型
timestamp := row["timestamp"].(int64) // 假设 timestamp 是 INTEGER 类型
sign := row["sign"].(string) // 假设 sign 是 string 类型
status := row["status"].(int64) // 假设 status 是 INTEGER 类型
// 处理数据
l.RemoveMsgs[chain].mu.RLock()
l.RemoveMsgs[chain].Msgs[queueID] = message.RemoveListenMsg_req{
QueueId: queueID,
MsgType: msgType,
Chain: chain,
Symbol: symbol,
Address: address,
Timestamp: uint64(timestamp),
Sign: sign,
Stauts: int(status),
}
l.RemoveMsgs[chain].mu.RUnlock()
}
log.Printf("移除监听历史消息load完毕")
}()
}
// 充值消息
func (l *ListenServer) handleRmqTopup_req(msg message.TopupMsg_req) {
// 添加到TopupMsgs
@@ -16,14 +168,20 @@ func (l *ListenServer) handleRmqTopup_req(msg message.TopupMsg_req) {
l.TopupMsgs[msg.Chain].Msgs[msg.QueueId] = msg
l.TopupMsgs[msg.Chain].mu.RUnlock()
// 写数据库
go func() {
sql := "INSERT INTO topup_req_msg (queue_id, chain, symbol, address, timestamp, sign, status) VALUES (?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign, 1}
err := l.SqliteDB.Insert(sql, params)
if err != nil {
log.Printf("Insert Topup_req msg error: %v", err)
}
}()
sql := "INSERT INTO topup_req_msg (queue_id, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
if err != nil {
log.Printf("Insert Topup_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.TopupMsg_resp{
QueueId: msg.QueueId,
Chain: msg.Chain,
Symbol: msg.Symbol,
Address: msg.Address,
Status: constant.STATUS_ERROR,
}, 3, 5*time.Second)
return
}
// 传给对应的node server
go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second)
log.Printf("Insert Topup_req msg success: QueueId(%s)", msg.QueueId)
@@ -36,14 +194,21 @@ func (l *ListenServer) handleRmqWithdraw_req(msg message.WithdrawMsg_req) {
l.WithdrawMsgs[msg.Chain].Msgs[msg.QueueId] = msg
l.WithdrawMsgs[msg.Chain].mu.RUnlock()
// 写数据库
go func() {
sql := "INSERT INTO withdraw_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
if err != nil {
log.Printf("Insert Withdraw_req msg error: %v", err)
}
}()
sql := "INSERT INTO withdraw_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
if err != nil {
log.Printf("Insert Withdraw_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.WithdrawMsg_resp{
QueueId: msg.QueueId,
Chain: msg.Chain,
Symbol: msg.Symbol,
FromAddress: msg.FromAddress,
ToAddress: msg.ToAddress,
Status: constant.STATUS_ERROR,
}, 3, 5*time.Second)
return
}
// 传给对应的node server
go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second)
log.Printf("Insert Withdraw_req msg success: QueueId(%s)", msg.QueueId)
@@ -51,28 +216,29 @@ func (l *ListenServer) handleRmqWithdraw_req(msg message.WithdrawMsg_req) {
// 支付消息
func (l *ListenServer) handleRmqPay_req(msg message.PayMsg_req) {
// 添加到WithdrawMsgs
l.PayMsgs[msg.Chain].mu.RLock()
l.PayMsgs[msg.Chain].Msgs[msg.QueueId] = msg
l.PayMsgs[msg.Chain].mu.RUnlock()
go func() {
sql1 := "INSERT INTO pay_req_msg (queue_id, chain, symbol, from_addr, total_amount, total_fee, timestamp, sign) VALUES (?,?,?,?,?,?,?,?)"
sql2 := "INSERT INTO transactions (queue_id, chain, symbol, from_addr, to_addr, amount, fee) VALUES "
params1 := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, fmt.Sprintf("%f", msg.TotalAmount), fmt.Sprintf("%f", msg.TotalFee), msg.Timestamp, msg.Sign}
params2 := []any{}
for to_addr, tx := range msg.Transactions {
sql2 += "(?,?,?,?,?,?,?), "
params2 = append(params2, msg.QueueId, msg.Chain, msg.Symbol, strings.ToLower(msg.FromAddress), strings.ToLower(to_addr), fmt.Sprintf("%f", tx.Amount), fmt.Sprintf("%f", tx.Fee))
}
sql2 = sql2[:len(sql2)-2]
err := l.SqliteDB.ExecuteTransactions([]string{sql1, sql2}, [][]any{params1, params2})
if err != nil {
log.Printf("Insert Pay_req msg error: %v", err)
}
}()
// 写数据库
sql := "INSERT INTO pay_req_msg (queue_id, from_addr, to_addr, amount, fee, chain, symbol, timestamp, sign) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Chain, msg.Symbol, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
if err != nil {
log.Printf("Insert PayMsg_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.PayMsg_resp{
QueueId: msg.QueueId,
Chain: msg.Chain,
Symbol: msg.Symbol,
FromAddress: msg.FromAddress,
ToAddress: msg.ToAddress,
Status: constant.STATUS_ERROR,
}, 3, 5*time.Second)
return
}
// 传给对应的node server
go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second)
log.Printf("Insert Pay_req msg success: QueueId(%s)", msg.QueueId)
log.Printf("Insert PayMsg_req msg success: QueueId(%s)", msg.QueueId)
}
// 移除监听消息
@@ -81,19 +247,46 @@ func (l *ListenServer) handleRmqRemove_req(msg message.RemoveListenMsg_req) {
l.RemoveMsgs[msg.Chain].Msgs[msg.QueueId] = msg
l.RemoveMsgs[msg.Chain].mu.RUnlock()
go func() {
sql := "INSERT INTO remove_req_msg (queue_id, msg_type, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
if err != nil {
log.Printf("Insert Remove_req msg error: %v", err)
}
}()
sql := "INSERT INTO remove_req_msg (queue_id, msg_type, chain, symbol, address, timestamp, sign) VALUES (?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.MsgType, msg.Chain, msg.Symbol, msg.Address, msg.Timestamp, msg.Sign}
err := l.SqliteDB.Insert(sql, params)
if err != nil {
log.Printf("Insert Remove_req msg error: %v", err)
go l.asyncSendMsgToRmq(message.RemoveListenMsg_resp{
QueueId: msg.QueueId,
MsgType: msg.MsgType,
Chain: msg.Chain,
Symbol: msg.Symbol,
Address: msg.Address,
Status: constant.STATUS_ERROR,
}, 3, 5*time.Second)
}
go l.asyncSendMsgToChain(msg, msg.Chain, 3, 5*time.Second)
log.Printf("Insert Remove_req msg success: QueueId(%s)", msg.QueueId)
}
// 更新状态响应
func (l *ListenServer) handleUpdateReqState(msg message.UpdateReqState) {
typeMap := map[int]string{
0: "topup_req_msg",
1: "withdraw_req_msg",
2: "pay_req_msg",
}
str := "UPDATE " + typeMap[msg.MsgType] + " SET status = ? WHERE queue_id = ?"
params := []any{msg.Status, msg.QueueId}
go func() {
count, err := l.SqliteDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update update_req_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
}()
}
// 充值响应
func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) {
switch msg.Status {
@@ -104,7 +297,7 @@ func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) {
count, err := l.SqliteDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)
log.Printf("Failed to update topup_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
@@ -152,7 +345,7 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) {
count, err := l.SqliteDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)
log.Printf("Failed to update withdraw_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
@@ -177,85 +370,50 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) {
}
// 支付响应
func (l *ListenServer) handleChainPay_resp(msg message.PayData) {
func (l *ListenServer) handleChainPay_resp(msg message.PayMsg_resp) {
switch msg.Status {
// pending状态表示转账已完成且在区块链中被找到等待后续确认+记录到数据库
case constant.STATUS_PENDING:
l.PayMsgs[msg.Chain].mu.Lock()
l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions[msg.ToAddress] = msg
l.PayMsgs[msg.Chain].mu.Unlock()
go func() {
str := "INSERT INTO pay_msg_tx (queue_id, tx_hash, to_addr, amount, fee, height, status) VALUES (?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.TxHash, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.BlockHeight, msg.Status}
str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, tx_hash, amount, fee, height, status) VALUES (?,?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, msg.TxHash, msg.Amount, msg.Fee, msg.BlockHeight, msg.Status}
err := l.SqliteDB.Insert(str, params)
if err != nil {
log.Printf("Insert pay_msg_tx msg error: %v", err)
log.Println(err)
}
}()
case constant.STATUS_SUCCESS, constant.STATUS_FAILED:
l.PayMsgs[msg.Chain].mu.Lock()
l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions[msg.ToAddress] = msg
l.PayMsgs[msg.Chain].mu.Unlock()
l.WithdrawMsgs[msg.Chain].mu.Lock()
delete(l.WithdrawMsgs[msg.Chain].Msgs, msg.QueueId)
l.WithdrawMsgs[msg.Chain].mu.Unlock()
go func() {
str := "UPDATE pay_msg_tx SET status = ? WHERE tx_hash = ?"
str := "UPDATE pay_resp_msg SET status = ? WHERE tx_hash = ?"
params := []any{msg.Status, msg.TxHash}
count, err := l.SqliteDB.Update(str, params...)
count, err := l.SqliteDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)
log.Printf("Failed to update pay_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
}()
go func() {
need_send := true
l.PayMsgs[msg.Chain].mu.Lock()
for _, tx := range l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions {
if tx.Status != constant.STATUS_SUCCESS && tx.Status != constant.STATUS_FAILED {
need_send = false
break
}
}
l.PayMsgs[msg.Chain].mu.Unlock()
if need_send {
str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, pay_status) VALUES (?,?,?,?,?)"
params := []any{l.PayMsgs[msg.Chain].Msgs[msg.QueueId].QueueId, l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Chain, l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Symbol, l.PayMsgs[msg.Chain].Msgs[msg.QueueId].FromAddress, constant.STATUS_SUCCESS}
err := l.SqliteDB.Insert(str, params)
if err != nil {
log.Printf("Insert pay_resp_msg msg error: %v", err)
}
resp := message.PayMsg_resp{
QueueId: msg.QueueId,
Chain: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Chain,
Symbol: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Symbol,
FromAddress: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].FromAddress,
PayStatus: constant.STATUS_SUCCESS,
Transactions: l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions,
}
go l.asyncSendMsgToRmq(resp, 3, 5*time.Second)
}
}()
go l.asyncSendMsgToRmq(msg, 3, 5*time.Second)
default:
l.PayMsgs[msg.Chain].mu.Lock()
l.PayMsgs[msg.Chain].Msgs[msg.QueueId].Transactions[msg.ToAddress] = msg
l.PayMsgs[msg.Chain].mu.Unlock()
l.WithdrawMsgs[msg.Chain].mu.Lock()
delete(l.WithdrawMsgs[msg.Chain].Msgs, msg.QueueId)
l.WithdrawMsgs[msg.Chain].mu.Unlock()
go func() {
str := "UPDATE pay_msg_tx SET status = ? WHERE tx_hash = ?"
params := []any{msg.Status, msg.TxHash}
count, err := l.SqliteDB.Update(str, params...)
str := "INSERT INTO pay_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, fee, status) VALUES (?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.ToAddress, fmt.Sprintf("%f", msg.Amount), fmt.Sprintf("%f", msg.Fee), msg.Status}
err := l.SqliteDB.Insert(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
log.Println(err)
}
}()
go l.asyncSendMsgToRmq(msg, 3, 5*time.Second)
}
}
@@ -273,7 +431,7 @@ func (l *ListenServer) handleChainRemove_resp(msg message.RemoveListenMsg_resp)
// 更新数据库
str := "UPDATE remove_resp_msg SET status = ? WHERE queue_id = ?"
params := []any{msg.Status, msg.QueueId}
count, err := l.SqliteDB.Update(str, params...)
count, err := l.SqliteDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)

View File

@@ -2,10 +2,8 @@ package logger
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
message "m2pool-payment/internal/msg"
"os"
"path/filepath"
"sync"
@@ -233,7 +231,7 @@ func LogWithdraw(fromAddress string, status string, amount float64, toAddress st
}
// LogPay 记录支付消息
func LogPay(status string, fromAddress string, queueId string, transactions map[string]*message.PayData) {
func LogPay(status string, fromAddress string, queueId string) {
if txLogger == nil {
return
}
@@ -244,14 +242,14 @@ func LogPay(status string, fromAddress string, queueId string, transactions map[
fmt.Printf("⚠️ 获取日志文件失败: %v\n", err)
return
}
t, err := json.Marshal(transactions)
if err != nil {
fmt.Println("Error marshalling to JSON:", err)
return
}
// t, err := json.Marshal(transactions)
// if err != nil {
// fmt.Println("Error marshalling to JSON:", err)
// return
// }
timestamp := time.Now().Format("2006-01-02 15:04:05")
content := fmt.Sprintf("%s [pay]-[%s] | FromAddress: %s | QueueId: %s | Transactions: %v",
timestamp, status, fromAddress, queueId, string(t))
content := fmt.Sprintf("%s [pay]-[%s] | FromAddress: %s | QueueId: %s",
timestamp, status, fromAddress, queueId)
if err := lf.write(content); err != nil {
fmt.Printf("⚠️ 写入日志失败: %v\n", err)

View File

@@ -57,40 +57,65 @@ type WithdrawMsg_resp struct {
}
// =============================== type2 ===============================
type PayMsg_req struct {
QueueId string `json:"queue_id"`
Chain string `json:"chain"`
Symbol string `json:"symbol"`
FromAddress string `json:"from_address"`
TotalAmount float64 `json:"total_amount"`
TotalFee float64 `json:"total_fee"`
Timestamp uint64 `json:"timestamp"`
Sign string `json:"sign"`
Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_req{}, ...}
Status int `json:"status,omitempty"`
QueueId string `json:"queue_id"`
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
FromAddress string `json:"from_address"` // 我们提供的地址
ToAddress string `json:"to_address"` // 用户要提现到的地址
Amount float64 `json:"amount"`
Fee float64 `json:"fee"`
Timestamp uint64 `json:"timestamp"`
Sign string `json:"sign"`
Status int `json:"status,omitempty"`
}
type PayMsg_resp struct {
QueueId string `json:"queue_id"`
Chain string `json:"chain"`
Symbol string `json:"symbol"`
FromAddress string `json:"from_address"`
PayStatus int `json:"pay_status"` // 1至少有一笔转账成功3sign校验失败4钱包余额不足
Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_resp{}, ...}
QueueId string `json:"queue_id"`
Chain string `json:"chain"` // 链名称
Symbol string `json:"symbol"` // 币种
FromAddress string `json:"from_address"` // 来源地址
ToAddress string `json:"to_address"` // 目标地址
TxHash string `json:"tx_hash,omitempty"`
Amount float64 `json:"amount,omitempty"`
Fee float64 `json:"fee,omitempty"`
BlockHeight uint64 `json:"block_height,omitempty"` // 区块高度
Status int `json:"status"` // 0失败1成功3sign校验失败
}
type PayData struct {
QueueId string `json:"queue_id,omitempty"`
Chain string `json:"chain,omitempty"`
Symbol string `json:"symbol,omitempty"`
TxHash string `json:"tx_hash,omitempty"`
ToAddress string `json:"to_address"`
Amount float64 `json:"amount"`
Fee float64 `json:"fee"`
BlockHeight uint64 `json:"block_height,omitempty"`
Status int `json:"status,omitempty"` // 0失败1成功, 2待确认
}
// type PayMsg_req struct {
// QueueId string `json:"queue_id"`
// Chain string `json:"chain"`
// Symbol string `json:"symbol"`
// FromAddress string `json:"from_address"`
// TotalAmount float64 `json:"total_amount"`
// TotalFee float64 `json:"total_fee"`
// Timestamp uint64 `json:"timestamp"`
// Sign string `json:"sign"`
// Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_req{}, ...}
// Status int `json:"status,omitempty"`
// }
// type PayMsg_resp struct {
// QueueId string `json:"queue_id"`
// Chain string `json:"chain"`
// Symbol string `json:"symbol"`
// FromAddress string `json:"from_address"`
// PayStatus int `json:"pay_status"` // 1至少有一笔转账成功3sign校验失败4钱包余额不足
// Transactions map[string]PayData `json:"transactions"` // {"to_address": PayData_resp{}, ...}
// }
// type PayData struct {
// QueueId string `json:"queue_id,omitempty"`
// Chain string `json:"chain,omitempty"`
// Symbol string `json:"symbol,omitempty"`
// TxHash string `json:"tx_hash,omitempty"`
// ToAddress string `json:"to_address"`
// Amount float64 `json:"amount"`
// Fee float64 `json:"fee"`
// BlockHeight uint64 `json:"block_height,omitempty"`
// Status int `json:"status,omitempty"` // 0失败1成功, 2待确认
// }
// =============================== type3 ===============================
// 接收到的删除监听地址消息
@@ -155,3 +180,9 @@ type TransferResult struct {
Amount float64
Status int
}
type UpdateReqState struct {
QueueId string
MsgType int
Status int
}

View File

@@ -190,8 +190,8 @@ func (r *RabbitMQServer) consumePay() {
if err := json.Unmarshal(body, &msg); err != nil {
return fmt.Errorf("failed to parse pay message: %w", err)
}
log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, From=%s, Chain=%s, Symbol=%s, TxCount=%d",
msg.QueueId, msg.FromAddress, msg.Chain, msg.Symbol, len(msg.Transactions))
log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, From=%s, To=%s, Chain=%s, Symbol=%s, Amount=%f",
msg.QueueId, msg.FromAddress, msg.ToAddress, msg.Chain, msg.Symbol, msg.Amount)
if r.OnPayMsg != nil {
r.OnPayMsg(msg)

View File

@@ -123,10 +123,9 @@ func (s *ServerCtx) handlePayMsg() {
// 验证签名
if !s.verifyMessage(msg.Timestamp, msg.Sign) {
err := s.rmqServer.PublishPayResp(message.PayMsg_resp{
QueueId: msg.QueueId,
FromAddress: msg.FromAddress,
PayStatus: constant.STATUS_VERIFY_FAILED,
Transactions: msg.Transactions,
QueueId: msg.QueueId,
FromAddress: msg.FromAddress,
Status: constant.STATUS_VERIFY_FAILED,
})
if err != nil {
log.Printf("❌ 发布支付失败响应失败: %v", err)
@@ -179,7 +178,7 @@ func (s *ServerCtx) handleRespMsg() {
return
}
case message.PayMsg_resp:
log.Printf("📨[提现响应]QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, Status=%d", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.PayStatus)
log.Printf("📨[提现响应]QueueID=%s, Chain=%s, Symbol=%s, FromAddress=%s, Status=%d", v.QueueId, v.Chain, v.Symbol, v.FromAddress, v.Status)
err := s.rmqServer.PublishPayResp(v)
if err != nil {
log.Printf("❌ 发送支付响应失败: %v", err)
@@ -199,13 +198,30 @@ func (s *ServerCtx) handleRespMsg() {
}
}
func (s *ServerCtx) initDB() {
// msgDB_path := s.Config.MsgConfig.SqlitePath
// ethDB_path := s.Config.ETHConfig.SqlitePath
msg_sql_script_path := "../public/msg.sql"
msg_sql_byte, err := os.ReadFile(msg_sql_script_path)
if err != nil {
log.Fatalf("open msg-sql file error: %v", err)
return
}
err = s.messageServer.SqliteDB.CreateTable(string(msg_sql_byte))
if err != nil {
log.Fatalf("exec msg-sql error: %v", err)
return
}
}
func Start(msgKey string) {
log.Println("========================================")
log.Println("🚀 M2Pool Payment System Starting...")
log.Println("========================================")
server := NewServer(msgKey)
server.initDB()
// 启动消息处理
go server.handleTopupMsg()
go server.handleWithdrawMsg()

View File

@@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS topup_req_msg (
address TEXT NOT NULL,
timestamp INTEGER,
sign TEXT,
status INTEGER DEFAULT 1,
status INTEGER DEFAULT 0, -- 0未在监听1正在监听
PRIMARY KEY (queue_id)
);
@@ -56,8 +56,8 @@ CREATE TABLE IF NOT EXISTS pay_req_msg (
symbol TEXT NOT NULL,
from_addr TEXT NOT NULL,
to_addr TEXT NOT NULL,
total_amount TEXT NOT NULL, -- 改为 TEXT 类型
total_fee TEXT NOT NULL, -- 改为 TEXT 类型
amount TEXT NOT NULL, -- 改为 TEXT 类型
fee TEXT NOT NULL, -- 改为 TEXT 类型
timestamp INTEGER,
sign TEXT,
status INTEGER DEFAULT 5,
@@ -69,21 +69,26 @@ CREATE TABLE IF NOT EXISTS pay_resp_msg (
chain TEXT NOT NULL,
symbol TEXT NOT NULL,
from_addr TEXT NOT NULL,
pay_status INTEGER DEFAULT 5,
FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
to_addr TEXT NOT NULL,
tx_hash TEXT DEFAULT NULL,
amount TEXT DEFAULT NULL, -- 改为 TEXT 类型
fee TEXT DEFAULT NULL, -- 改为 TEXT 类型
height INTEGER DEFAULT NULL,
status INTEGER DEFAULT 5,
FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
);
-- pay_msg的交易
CREATE TABLE IF NOT EXISTS pay_msg_tx (
queue_id TEXT NOT NULL,
tx_hash TEXT DEFAULT NULL,
to_addr TEXT NOT NULL,
amount TEXT DEFAULT NULL, -- 改为 TEXT 类型
fee TEXT, -- 改为 TEXT 类型
height INTEGER DEFAULT 0,
status INTEGER DEFAULT 5,
FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
);
-- CREATE TABLE IF NOT EXISTS pay_msg_tx (
-- queue_id TEXT NOT NULL,
-- tx_hash TEXT DEFAULT NULL,
-- to_addr TEXT NOT NULL,
-- amount TEXT DEFAULT NULL, -- 改为 TEXT 类型
-- fee TEXT, -- 改为 TEXT 类型
-- height INTEGER DEFAULT 0,
-- status INTEGER DEFAULT 5,
-- FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
-- );
CREATE TABLE IF NOT EXISTS remove_req_msg(
queue_id TEXT NOT NULL,

View File

@@ -52,39 +52,44 @@ CREATE TABLE IF NOT EXISTS withdraw_resp_msg (
);
CREATE TABLE IF NOT EXISTS pay_req_msg (
queue_id VARCHAR(255) NOT NULL,
queue_id VARCHAR(255) NOT NULL,
chain VARCHAR(32) NOT NULL,
symbol VARCHAR(32) NOT NULL,
from_addr VARCHAR(255) NOT NULL,
to_addr VARCHAR(255) NOT NULL,
total_amount DECIMAL(40, 16) NOT NULL,
total_fee DECIMAL(40, 16) NOT NULL,
amount DECIMAL(30,16) NOT NULL,
fee DECIMAL(30,16) NOT NULL,
timestamp BIGINT,
sign VARCHAR(255),
status TINYINT DEFAULT 5, --constant模块的定义
status TINYINT DEFAULT 5, --constant模块的定义
PRIMARY KEY (queue_id)
);
CREATE TABLE IF NOT EXISTS pay_resp_msg (
queue_id VARCHAR(255) NOT NULL,
queue_id VARCHAR(255) NOT NULL,
chain VARCHAR(32) NOT NULL,
symbol VARCHAR(32) NOT NULL,
from_addr VARCHAR(255) NOT NULL,
pay_status TINYINT DEFAULT 5, --constant模块的定义
FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
to_addr VARCHAR(255) NOT NULL,
tx_hash VARCHAR(255) DEFAULT NULL,
amount DECIMAL(30,16) DEFAULT NULL,
fee DECIMAL(30,16) DEFAULT NULL,
height BIGINT DEFAULT NULL,
status TINYINT DEFAULT 5, --constant模块的定义
FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
);
-- pay_msg的交易
CREATE TABLE IF NOT EXISTS pay_msg_tx (
queue_id VARCHAR(255) NOT NULL,
tx_hash VARCHAR(255) DEFAULT NULL,
to_addr VARCHAR(255) NOT NULL,
amount DECIMAL(30,16) DEFAULT NULL,
fee DECIMAL(30,16),
height BIGINT DEFAULT 0,
status TINYINT DEFAULT 5, --constant模块的定义
FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
);
-- CREATE TABLE IF NOT EXISTS pay_msg_tx (
-- queue_id VARCHAR(255) NOT NULL,
-- tx_hash VARCHAR(255) DEFAULT NULL,
-- to_addr VARCHAR(255) NOT NULL,
-- amount DECIMAL(30,16) DEFAULT NULL,
-- fee DECIMAL(30,16),
-- height BIGINT DEFAULT 0,
-- status TINYINT DEFAULT 5, --遵循constant模块的定义
-- FOREIGN KEY (queue_id) REFERENCES pay_req_msg(queue_id)
-- );
CREATE TABLE IF NOT EXISTS remove_req_msg(
queue_id VARCHAR(255) NOT NULL,