This commit is contained in:
lzx
2025-11-13 17:59:13 +08:00
parent 00389efb75
commit 245c9c94cb
7 changed files with 487 additions and 136 deletions

531
README.md
View File

@@ -74,118 +74,200 @@ M2Pool Payment System v2 是一个基于以太坊区块链的**分布式支付
↓ ↑
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ │
│ ┌─────────┐ ┌──────────┐ ┌────────┐
│ │ topup │ │ withdraw │ │ pay │ 请求队列
│ └─────────┘ └──────────┘ └────────┘
│ ┌─────────┐ ┌──────────┐ ┌────────┐
│ │topup_ │ │withdraw_ │ │ pay_ │ 响应队列
│ │ resp │ │ resp │ │ resp │
│ └─────────┘ └──────────┘ └────────┘
│ ┌─────────┐ ┌──────────┐ ┌────────┐ ┌────────┐
│ │ topup │ │ withdraw │ │ pay │ │ remove │ 请求队列 │
│ └─────────┘ └──────────┘ └────────┘ └────────┘
│ ┌─────────┐ ┌──────────┐ ┌────────┐ ┌────────┐
│ │topup_ │ │withdraw_ │ │ pay_ │ │remove_ │ 响应队列 │
│ │ resp │ │ resp │ │ resp │ │ resp
│ └─────────┘ └──────────┘ └────────┘ └────────┘
└────────────┬────────────────────────────┬───────────────────┘
│ │
↓ ↑
┌─────────────────────────────────────────────────────────────┐
│ M2Pool Payment System v2 │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ │ RabbitMQ │ │ Blockchain │ │ Database
│ │ Consumer │─>│ Manager─>│ (MySQL)
│ └──────────────┘ └──────┬───────┘ └──────────────┘
│ WebSocket + RPC
└────────────────────────────┼─────────────────────────────────┘
│ ┌──────────────┐ ┌──────────────┐
│ │ Server │ │ ListenServer │
│ │ (路由层) │────────>│ (消息层)
│ └──────────────┘ └──────┬───────┘
┌────────────┴────────────┐
│ ↓ ↓ │
┌──────────────────┐ ┌──────────────────┐
Blockchain Database │ │
│ │ Manager │ │ (MySQL/SQLite) │ │
│ │ (ETH Node) │ │ │ │
│ └────────┬─────────┘ └──────────────────┘ │
│ │ │
│ │ WebSocket + RPC │
└───────────────────┼─────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 以太坊区块链网络 │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 新区块 │ │ USDT Transfer │ │
│ │ (NewHead) │ │ 事件 │
│ │ 新区块 │ │ ETH/USDT
│ │ (NewHead) │ │ Transfer事件 │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
### 消息流转架构
```
业务系统 → RabbitMQ → Server → ListenServer → Blockchain Node
↑ ↓
│ (记录到数据库)
│ │
│ (监听链上事件)
│ │
│ ←── 返回响应 ───
│ │
└─── 发送响应 ───────────┘
```
#### 核心组件说明
1. **Server (server.go)**: 服务入口和消息路由
- 接收 RabbitMQ 消息
- 验证消息签名
- 路由消息到 ListenServer
2. **ListenServer (listen/)**: 消息中间层
- 管理消息状态TopupMsgs, WithdrawMsgs, PayMsgs, RemoveMsgs
- 与区块链节点双向通信
- 持久化消息到 SQLite
- 转发响应到 RabbitMQ
3. **Blockchain Node (blockchain/eth/)**: 区块链交互层
- 监听链上事件ETH 和 USDT 转账)
- 执行转账交易
- 管理待确认交易UnConfirmTxs
- 确认交易状态
### 核心模块
#### 1. Blockchain Manager (`internal/blockchain/`)
- **blockchain.go**:统一的区块链接口定义
- **eth/eth.go**:以太坊节点实现
- 监听 USDT Transfer 事件(实时检测充值)
- 监听新区块产生(触发交易确认)
- 管理待确认交易池UnConfirmTxs
- 执行 ERC20 转账(提现/支付)
- 自动重连机制
- 地址统一小写处理
#### 1. Server (`internal/server.go`)
- 服务入口和启动管理
- 消息签名验证
- 消息路由到 ListenServer
- RabbitMQ 消息处理回调
- 响应消息转发到 RabbitMQ
#### 2. Message Queue (`internal/queue/`)
#### 2. ListenServer (`internal/listen/`)
- **listen.go**:消息中间层,连接 RabbitMQ 和区块链节点
- **RmqMsgIn()**:处理来自 RabbitMQ 的消息
- 充值请求 → 写入数据库 → 添加到 TopupMsgs → 转发到区块链节点
- 提现请求 → 写入数据库 → 添加到 WithdrawMsgs → 转发到区块链节点
- 支付请求 → 写入数据库 → 添加到 PayMsgs → 转发到区块链节点
- 移除监听 → 写入数据库 → 添加到 RemoveMsgs → 转发到区块链节点
- **NetMsgIn()**:处理来自区块链节点的响应
- 充值响应 → 更新数据库 → 转发到 RabbitMQ
- 提现响应 → 更新数据库 → 转发到 RabbitMQ
- 支付响应 → 更新数据库 → 转发到 RabbitMQ
- 消息状态管理TopupMsgs, WithdrawMsgs, PayMsgs, RemoveMsgs
- 消息查找(根据 address 或 queue_id
#### 3. Blockchain Manager (`internal/blockchain/`)
- **blockchain.go**:统一的区块链接口定义和路由
- **eth/eth.go**:以太坊节点实现
- **Listen()**:监听链上事件
- 监听 ETH Transfer 事件
- 监听 USDT Transfer 事件
- 监听新区块产生(用于交易确认)
- **ListenMsg()**:监听来自 ListenServer 的消息
- 充值请求 → 添加到钱包监听
- 提现请求 → 执行转账
- 支付请求 → 执行转账
- 移除监听 → 移除钱包监听
- **Transfer()**:执行转账交易
- 管理待确认交易池UnConfirmTxs
- 交易确认逻辑
#### 4. Message Queue (`internal/queue/`)
- **rabbitmq.go**RabbitMQ 消息队列服务
- 消费 3 个请求队列(充值/提现/支付)
- 发布 3 个响应队列(交易确认结果)
- 消费 4 个请求队列(充值/提现/支付/移除监听
- 发布 4 个响应队列(交易确认结果)
- 自动重连和错误重试
- 消息持久化
#### 3. Database (`internal/db/`)
- **db.go**MySQL 数据库连接池
#### 5. Database (`internal/db/`)
- **mysql.go**MySQL 数据库连接池
- 存储钱包私钥(加密)
- 连接池管理
- 事务支持
- **sqlite.go**SQLite 本地存储
- 存储消息请求和响应记录
- 存储未确认交易信息
- 提供事务支持
#### 4. Message (`internal/msg/`)
#### 6. Message (`internal/msg/`)
- **msg.go**:消息结构定义
- 请求消息TopupMsg_req, WithdrawMsg_req, PayMsg_req
- 响应消息TopupMsg_resp, WithdrawMsg_resp, PayMsg_resp
- 配置结构Config, RMQConfig, ETHConfig
- 请求消息TopupMsg_req, WithdrawMsg_req, PayMsg_req, RemoveListenMsg_req
- 响应消息TopupMsg_resp, WithdrawMsg_resp, PayMsg_resp, RemoveListenMsg_resp
- **config.go**配置结构
- Config, RmqConfig, ETHConfig, MysqlConfig
#### 5. Utils (`internal/utils/`)
#### 7. Utils (`internal/utils/`)
- **utils.go**:工具函数
- 数值转换BigInt ↔ Float64
- 哈希计算
- 加密解密
- 数组/切片操作
- 字符串处理
#### 6. Crypto (`internal/crypto/`)
#### 8. Crypto (`internal/crypto/`)
- **crypto.go**:加密工具
- SHA256 哈希
- 签名验证
#### 7. Server (`internal/server.go`)
- 服务启动和管理
- 消息路由和处理
- 优雅关闭
#### 9. Logger (`internal/logger/`)
- **transaction_logger.go**:交易日志记录
- 记录所有交易操作的详细日志
### 项目结构
```
m2pool-payment-v2/
├── cmd/ # 主程序入口
│ └── main.go # 程序入口,解析命令行参数
│ └── main.go # 程序入口,解析命令行参数 (-key)
├── internal/ # 内部包(不对外暴露)
│ ├── server.go # 服务启动和管理
│ ├── server.go # 服务启动和管理、消息路由
│ ├── blockchain/ # 区块链交互模块
│ │ ├── blockchain.go # 统一的区块链接口定义
│ │ ── eth/ # 以太坊实现
│ │ ── eth.go # USDT 监听、转账、确认
│ │ └── tron/ # TRON 实现(待开发
│ ├── crypto/ # 加密工具
│ │ ── crypto.go # SHA256、签名验证
│ │ ├── blockchain.go # 统一的区块链接口定义和路由
│ │ ── eth/ # 以太坊实现
│ │ ── eth.go # ETH节点接口定义
│ │ └── eth_prv.go # ETH节点实现监听、转账、确认
│ ├── listen/ # 消息中间层
│ │ ── listen.go # ListenServer 定义和消息查找
│ │ └── listen_prv.go # ListenServer 实现(消息处理)
│ ├── queue/ # 消息队列
│ │ └── rabbitmq.go # RabbitMQ 客户端封装
│ ├── db/ # 数据库
│ │ ├── db.go # MySQL 连接池管理
│ │ ├── mysql.go # MySQL 连接池管理
│ │ └── sqlite.go # SQLite 本地存储
│ ├── msg/ # 消息定义
│ │ ── msg.go # 请求/响应结构体定义
├── queue/ # 消息队列
│ ├── rabbitmq.go # RabbitMQ 客户端封装
│ │ └── README.md # RabbitMQ 使用文档
│ │ ── msg.go # 请求/响应结构体定义
│ └── config.go # 配置结构定义
│ ├── crypto/ # 加密工具
│ │ └── crypto.go # SHA256、签名验证
│ ├── logger/ # 日志记录
│ │ └── transaction_logger.go # 交易日志
│ ├── constant/ # 常量定义
│ │ └── constant.go # 状态码等常量
│ └── utils/ # 工具函数
│ └── utils.go # 类型转换、格式化
├── public/ # 公共资源
│ ├── SQLite3.sql # SQLite 表结构
── migration.sql # 数据库迁移脚本
│ ├── eth.sql # ETH钱包表结构MySQL
── msg.sql # 消息表结构SQLite
│ └── eth_mysql.sql # MySQL 完整建表脚本
├── bin/ # 构建和启动脚本
│ ├── build.sh # 编译脚本
│ └── start.sh # 启动脚本
├── test/ # 测试和示例
│ ├── test.go # 测试程序(独立运行)
│ └── config.json # 配置文件
│ └── config.json # 配置文件示例
├── 流程.txt # 业务流程说明文档
├── go.mod # Go 模块定义
├── go.sum # 依赖版本锁定
└── README.md # 项目文档(本文件)
@@ -256,57 +338,155 @@ e.WsClient.SubscribeNewHead(e.Ctx, headers)
### 1. 充值功能 💰
**完整流程:**
```
用户转账 → 实时检测 → 待确认通知 → 区块确认 → 最终通知
业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到TopupMsgs
Blockchain Node → 添加到钱包 → 记录到钱包数据库
新区块产生 → 监听链上事件 → 对比交易
消息中的to = 区块交易中的to(充值)
返回消息 → ListenServer → 修改数据库
记录到UnConfirmTxs → 返回RabbitMQ → 发送待确认消息(status=2)
新区块产生 → 读取UnConfirmTxs → 对比交易高度
符合确认条件(当前高度 >= 交易高度 + 确认高度)
修改钱包数据 → 返回消息 → ListenServer → 修改数据库
返回RabbitMQ → 发送最终确认消息(status=1/0)
```
**特点:**
- ✅ 实时检测到账
- ✅ 发送**两次**通知:待确认 + 最终确认
- ✅ 支持多币种(当前支持 USDT
- ✅ 发送**两次**通知:待确认(status=2) + 最终确认(status=1/0)
- ✅ 支持 ETH 和 USDT
- ✅ 自动地址监听管理
- ✅ 钱包余额自动更新
**消息流:**
1. 业务系统发送充值请求 → RabbitMQ
2. 系统添加地址监听
3. 用户转账 → 立即通知status=2 待确认)
4. 等待 20 个区块 → 最终通知status=1 成功 / 0 失败)
1. **业务系统发送充值请求** → RabbitMQ `topup.queue`
2. **Server 验证签名** → 转发到 ListenServer
3. **ListenServer** → 写入 SQLite 数据库 → 添加到 TopupMsgs → 转发到 Blockchain Node
4. **Blockchain Node** → 添加地址到钱包监听 → 写入 MySQL 钱包数据库
5. **监听链上事件** → 检测到充值交易 → 匹配地址 → 返回 TopupMsg_respstatus=2→ 记录到 UnConfirmTxs
6. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `topup_resp.queue`**第一次通知**
7. **新区块确认** → 达到确认高度 → 更新钱包余额 → 返回 TopupMsg_respstatus=1/0
8. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `topup_resp.queue`**第二次通知**
### 2. 提现功能 💸
**完整流程:**
```
提现请求 → 验证余额 → 发送交易 → 等待确认 → 返回结果
业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到WithdrawMsgs
Blockchain Node → 记录到数据库
开始转账 → 校验余额 → 发送交易
转账结果返回 → ListenServer → 修改相关状态
新区块产生 → 监听链上事件 → 对比交易
消息中的from、to、amount = 区块交易中的from、to、amount
返回消息 → ListenServer → 修改数据库状态
记录到UnConfirmTxs → 返回RabbitMQ → 发送消息(status=2)
新区块产生 → 读取UnConfirmTxs → 对比交易高度
符合确认条件(当前高度 >= 交易高度 + 确认高度)
修改钱包数据 → 返回消息 → ListenServer → 修改数据库
返回RabbitMQ → 发送最终确认消息(status=1/0)
```
**特点:**
- ✅ 自动余额检查
- ✅ 余额不足时使用归集钱包
- ✅ 发送**次**通知:最终确认
- ✅ 发送**次**通知:待确认(status=2) + 最终确认(status=1/0)
- ✅ Gas 费用检查
- ✅ 支持 ETH 和 USDT
**消息流:**
1. 业务系统发送提现请求 → RabbitMQ
2. 系统验证余额并发送交易
3. 等待 20 个区块确认
4. 返回结果status=1 成功 / 0 失败
1. **业务系统发送提现请求** → RabbitMQ `withdraw.queue`
2. **Server 验证签名** → 转发到 ListenServer
3. **ListenServer** → 写入 SQLite 数据库 → 添加到 WithdrawMsgs → 转发到 Blockchain Node
4. **Blockchain Node** → 验证余额 → 发送交易 → 记录到数据库 → 返回 WithdrawMsg_respstatus=2
5. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `withdraw_resp.queue`**第一次通知**
6. **监听链上事件** → 检测到提现交易 → 匹配交易 → 记录到 UnConfirmTxs
7. **新区块确认** → 达到确认高度 → 更新钱包余额 → 返回 WithdrawMsg_respstatus=1/0
8. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `withdraw_resp.queue`**第二次通知**
### 3. 支付功能 💳
**完整流程:**
```
支付请求 → 验证余额 → 发送交易 → 等待确认 → 返回结果
业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到PayMsgs
Blockchain Node → 记录到数据库
开始转账 → 校验余额 → 发送交易
转账结果返回 → ListenServer → 修改相关状态
新区块产生 → 监听链上事件 → 对比交易
消息中的from、to、amount = 区块交易中的from、to、amount
返回消息 → ListenServer → 修改数据库状态
记录到UnConfirmTxs → 返回RabbitMQ → 发送消息(status=2)
新区块产生 → 读取UnConfirmTxs → 对比交易高度
符合确认条件(当前高度 >= 交易高度 + 确认高度)
修改钱包数据 → 返回消息 → ListenServer → 修改数据库
返回RabbitMQ → 发送最终确认消息(status=1/0)
```
**特点:**
-订单关联
-支持批量支付(一次请求多笔转账)
- ✅ 自动余额检查
- ✅ 发送**次**通知:最终确认
-支持商户收款
- ✅ 发送**次**通知:待确认(status=2) + 最终确认(status=1/0)
-Gas 费用检查
- ✅ 支持 ETH 和 USDT
**消息流:**
1. 业务系统发送支付请求含订单ID→ RabbitMQ
2. 系统验证余额并发送交易
3. 等待 20 个区块确认
4. 返回结果status=1 成功 / 0 失败
1. **业务系统发送支付请求** → RabbitMQ `pay.queue`(可包含多笔交易)
2. **Server 验证签名** → 转发到 ListenServer
3. **ListenServer** → 写入 SQLite 数据库 → 添加到 PayMsgs → 转发到 Blockchain Node
4. **Blockchain Node** → 验证余额 → 逐个发送交易 → 记录到数据库 → 返回 PayDatastatus=2
5. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `pay_resp.queue`**第一次通知**
6. **监听链上事件** → 检测到支付交易 → 匹配交易 → 记录到 UnConfirmTxs
7. **新区块确认** → 达到确认高度 → 更新钱包余额 → 返回 PayDatastatus=1/0
8. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `pay_resp.queue`**第二次通知**
### 4. 移除监听功能 🗑️
**完整流程:**
```
业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到RemoveMsgs
Blockchain Node → 移除钱包监听
返回消息 → ListenServer → 修改数据库
返回RabbitMQ → 发送响应消息(status=1/0)
```
**特点:**
- ✅ 支持移除充值监听
- ✅ 支持移除提现/支付监听
- ✅ 实时生效
---
@@ -355,16 +535,25 @@ cd test
cp config.json config.json.backup
# 编辑 config.json填入实际配置
# 5. 编译主程序
cd ..
# 5. 准备配置文件
cp bin/config.json ./config.json
# 编辑 config.json填入实际配置
# 6. 初始化数据库
# MySQL: 执行 public/eth_mysql.sql
# SQLite: 程序会自动创建(如果不存在)
# 7. 编译主程序
go build -o m2pool-payment cmd/main.go
# 6. 运行(指定通信密钥)
# 8. 运行(指定通信密钥)
./m2pool-payment -key=your_secret_key
# 或者运行测试程序
cd test
go run test.go
# 或者使用启动脚本
cd bin
chmod +x build.sh start.sh
./build.sh
./start.sh your_secret_key
```
### 配置文件示例
@@ -373,6 +562,7 @@ go run test.go
```json
{
"net": ["ETH"],
"rmq_config": {
"sub_addr": "amqp://m2pool:m2pool@localhost:5672",
"pay": {
@@ -390,6 +580,11 @@ go run test.go
"exchange": "pay.exchange",
"routing": ["pay.withdraw.routing.key"]
},
"remove": {
"queue": "pay.remove.queue",
"exchange": "pay.exchange",
"routing": ["pay.remove.routing.key"]
},
"pay_resp": {
"queue": "pay.auto.return.queue",
"exchange": "pay.exchange",
@@ -404,22 +599,33 @@ go run test.go
"queue": "pay.withdraw.return.queue",
"exchange": "pay.exchange",
"routing": ["pay.withdraw.return.routing.key"]
},
"remove_resp": {
"queue": "pay.remove.return.queue",
"exchange": "pay.exchange",
"routing": ["pay.remove.return.routing.key"]
}
},
"eth_config": {
"rpcUrl": "http://localhost:8545",
"wsUrl": "ws://localhost:8546",
"confirmHeight": 20,
"dbConfig": {
"msg_config": {
"sqlite_path": "./msg.db"
},
"mysql_config": {
"wallet": {
"user": "root",
"password": "your_password",
"host": "127.0.0.1",
"port": 3306,
"database": "payment",
"maxOpenConns": 20,
"maxIdleCoons": 20,
"connMaxLife": 120
"maxIdleConns": 10,
"connMaxLife": 120000000000
}
},
"eth_config": {
"rpc_url": "http://localhost:8545",
"ws_url": "ws://localhost:8546",
"confirm_height": 20,
"sqlite_path": "./eth.db"
}
}
```
@@ -580,19 +786,23 @@ go run test.go
## 常见问题
### Q1: 为什么充值会收到两次通知?
### Q1: 为什么所有功能都会收到两次通知?
**A:** 这是设计特性!
- **第一次**status=2检测到交易提醒用户"正在确认"
- **第一次**status=2检测到交易/开始处理,提醒用户"正在确认"
- **第二次**status=1/0交易确认通知最终结果
业务系统应该:
- status=2显示进度**不增加余额**
- status=1增加余额
- status=2显示进度**不修改余额或订单状态**
- status=1增加/减少余额,更新订单状态
- status=0交易失败不修改余额提示错误
### Q2: 提现/支付为什么只有一次通知?
### Q2: 为什么需要两次通知?
**A:** 因为是系统主动发起的交易,用户已经知道在处理中,不需要额外的待确认通知。
**A:**
1. **实时反馈**:用户能立即知道交易已被检测到
2. **状态追踪**:业务系统可以追踪交易从待确认到最终确认的完整流程
3. **用户体验**:提供更好的用户交互体验
### Q3: 如何处理交易失败?
@@ -965,38 +1175,121 @@ type Metrics struct {
---
## 业务流程详解
根据 `流程.txt` 文档,系统的完整业务流程如下:
### 充值流程
1. **RabbitMQ → ListenServer**
- 接收充值请求
- 写入 SQLite 数据库(`topup_req_msg` 表)
- 添加到 TopupMsgs 内存映射
2. **ListenServer → Blockchain Node**
- 转发充值请求到区块链节点
- 区块链节点接收后:
- 添加地址到钱包监听
- 记录到 MySQL 钱包数据库(`ETH_wallets` 表)
3. **Blockchain Node → ListenServer监听阶段**
- 新区块产生时,监听链上交易
- 对比消息中的 `to` 地址和区块交易中的 `to` 地址
- 如果匹配(充值检测):
- 返回 TopupMsg_respstatus=2 待确认)
- 记录到 UnConfirmTxs待确认交易池
- ListenServer 接收后:
- 修改数据库状态
- 返回 RabbitMQ → 发送第一次通知status=2
4. **Blockchain Node → ListenServer确认阶段**
- 新区块产生时,读取 UnConfirmTxs 数据
- 对比每个交易的高度:`当前高度 >= 交易高度 + 确认高度`
- 符合确认条件后:
- 修改钱包数据(更新余额)
- 返回 TopupMsg_respstatus=1 成功 / 0 失败)
- ListenServer 接收后:
- 修改数据库状态
- 返回 RabbitMQ → 发送第二次通知status=1/0
### 提现/支付流程
1. **RabbitMQ → ListenServer**
- 接收提现/支付请求
- 写入 SQLite 数据库(`withdraw_req_msg``pay_req_msg` 表)
- 添加到 WithdrawMsgs 或 PayMsgs 内存映射
2. **ListenServer → Blockchain Node**
- 转发提现/支付请求到区块链节点
- 区块链节点接收后:
- 记录到数据库
- 开始转账流程:
- 校验余额
- 发送交易
- 返回响应status=2 待确认)
- ListenServer 接收后:
- 修改相关状态
- 返回 RabbitMQ → 发送第一次通知status=2
3. **Blockchain Node → ListenServer监听阶段**
- 新区块产生时,监听链上交易
- 对比消息中的 `from``to``amount` 和区块交易中的对应字段
- 如果匹配(提现/支付检测):
- 返回响应status=2 待确认)
- 记录到 UnConfirmTxs
- ListenServer 接收后:
- 修改数据库状态
- 返回 RabbitMQ → 发送第一次通知status=2
4. **Blockchain Node → ListenServer确认阶段**
- 新区块产生时,读取 UnConfirmTxs 数据
- 对比每个交易的高度:`当前高度 >= 交易高度 + 确认高度`
- 符合确认条件后:
- 修改钱包数据(更新余额)
- 返回响应status=1 成功 / 0 失败)
- ListenServer 接收后:
- 修改数据库状态
- 返回 RabbitMQ → 发送第二次通知status=1/0
## 重要修复说明
### 🔧 已修复的问题
#### 1. QueueId 重复问题
#### 1. 服务启动问题
**问题**两笔不同的交易出现相同的 QueueId
**原因**:数据库表主键设计错误,使用 `(from_addr, to_addr)` 作为主键
**问题**部分关键服务未启动
**修复**
- 修改数据库表结构,将主键改为 `queueId`
- 创建数据库迁移脚本 `public/migration.sql`
- 修复 SQL 插入语句的参数数量不匹配问题
- 添加 `rmqServer.Start()` 启动 RabbitMQ 服务
- 添加 `messageServer.RmqMsgIn()` 启动消息接收监听
- 添加 `messageServer.NetMsgIn()` 启动网络消息监听
- 添加 `blockChainServer.Listen()` 启动区块链监听
- 添加 `blockChainServer.ListenMsg()` 启动区块链消息监听
#### 2. 重复发送响应问题
#### 2. 错误处理优化
**问题**提现和支付会发送两次响应
**原因**:转账失败时,先发送失败响应,然后仍然进入链上确认流程
**问题**使用 `log.Fatalf` 导致程序非正常退出
**修复**
- 在转账失败时添加 `return` 语句
- 确保转账失败时不进入链上确认流程
- 只有转账成功才会进入链上监听和确认
- 将非关键错误的 `log.Fatalf` 改为 `log.Printf`
- 添加错误恢复机制,确保程序稳定运行
### 📊 修复后的消息发送次数
#### 3. 变量和字段名称修复
| 场景 | 消息处理阶段 | 链上确认阶段 | 总发送次数 |
|------|-------------|-------------|-----------|
| **转账失败** | 发送失败响应 | 不进入已return | **1次** |
| **转账成功** | 不发送响应 | 发送成功响应 | **1次** |
**问题**:部分变量和字段名称存在拼写错误
**修复**
- 修复 `Heihgt``Height`
- 修复数据库字段名称不匹配问题
- 修复 mutex 解锁错误
#### 4. 逻辑错误修复
**问题**:交易确认高度判断逻辑错误
**修复**
- 修复确认高度判断条件:`now_height >= tx.Height+e.ConfirmHeight`
- 修复变量重复声明问题
---

2
go.mod
View File

@@ -39,7 +39,7 @@ require (
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/holiman/uint256 v1.3.2 // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/rabbitmq/amqp091-go v1.10.0
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect

View File

@@ -391,18 +391,19 @@ func (e *ETHNode) loadWallets() error {
if err := rows.Err(); err != nil {
return fmt.Errorf("error occurred while iterating rows: %v", err)
}
pks, err := e.getAddressesPks(addresses)
if err != nil {
return fmt.Errorf("inital balance private key error: %v", err)
if len(addresses) > 0 {
pks, err := e.getAddressesPks(addresses)
if err != nil {
return fmt.Errorf("inital balance private key error: %v", err)
}
e.mu.Lock()
e.Wallets = wallets
for address, pk := range pks {
e.Wallets[address].pk = pk
}
e.mu.Unlock()
}
e.mu.Lock()
e.Wallets = wallets
for address, pk := range pks {
e.Wallets[address].pk = pk
}
e.mu.Unlock()
return nil
}
@@ -1090,6 +1091,13 @@ func (e *ETHNode) handleListen_Topup_req(msg message.TopupMsg_req) {
pk, err := e.getAddressPk(msg.Address)
if err != nil {
log.Printf("Query balance(%s-%s) private_key error: %v", msg.Chain, msg.Address, err)
go e.asyncSendMsgToListen(message.TopupMsg_resp{
QueueId: msg.QueueId,
Chain: msg.Chain,
Symbol: msg.Symbol,
Address: msg.Address,
Status: msg.Status,
}, 3, 5*time.Second)
return
}
// 添加到钱包
@@ -1226,8 +1234,9 @@ func (e *ETHNode) handleListen_Pay_req(msg message.PayMsg_req) {
if err != nil {
log.Printf("check balance error: %v", err)
result_msg.PayStatus = constant.STATUS_ERROR
for _, tx := range result_msg.Transactions {
for to, tx := range result_msg.Transactions {
tx.Status = constant.STATUS_ERROR
result_msg.Transactions[to] = tx
}
go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second)
return

View File

@@ -96,7 +96,36 @@ func (l *ListenServer) handleRmqRemove_req(msg message.RemoveListenMsg_req) {
// 充值响应
func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) {
switch msg.Status {
case constant.STATUS_SUCCESS, constant.STATUS_FAILED:
// 修改数据库
str := "UPDATE topup_resp_msg SET status = ? WHERE tx_hash = ?"
params := []any{msg.Status, msg.TxHash}
count, err := l.SqliteDB.Update(str, params)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
case constant.STATUS_PENDING:
str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, tx_hash, height, status) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.Address, msg.Amount, msg.TxHash, msg.BlockHeight, msg.Status}
err := l.SqliteDB.Insert(str, params)
if err != nil {
log.Printf("Insert Topup_resp msg error: %v", err)
}
default:
// 插入数据库
str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, to_addr, status) VALUES (?,?,?,?,?,?,?,?,?)"
params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Status}
err := l.SqliteDB.Insert(str, params)
if err != nil {
log.Printf("Insert Topup_resp msg error: %v", err)
}
}
go l.asyncSendMsgToRmq(msg, 3, 5*time.Second)
}
// 提现响应
@@ -121,8 +150,12 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) {
str := "UPDATE withdraw_resp_msg SET status = ? WHERE tx_hash = ?"
params := []any{msg.Status, msg.TxHash}
count, err := l.SqliteDB.Update(str, params)
if err != nil || count != 1 {
log.Printf("count=%d, err=%v", count, err)
if err != nil {
// 更详细的错误日志,包括 QueueId 和 Status
log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err)
} else if count != 1 {
// 如果更新的行数不是 1日志中记录详细信息
log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count)
}
}()
go l.asyncSendMsgToRmq(msg, 3, 5*time.Second)

View File

@@ -3,7 +3,7 @@ CREATE TABLE IF NOT EXISTS ETH_wallets (
queue_id TEXT NOT NULL,
timestamp INTEGER NOT NULL,
sign TEXT NOT NULL,
status INTEGER DEFAULT 0, -- 0未在监听 1正在监听
status INTEGER DEFAULT 0 -- 0未在监听 1正在监听
);
CREATE TABLE IF NOT EXISTS ETH_balances (

View File

@@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS topup_resp_msg (
queue_id TEXT NOT NULL,
chain TEXT NOT NULL,
symbol TEXT NOT NULL,
from_addr TEXT NOT NULL,
from_addr TEXT DEFAULT NULL,
to_addr TEXT NOT NULL,
amount TEXT NOT NULL, -- 改为 TEXT 类型
tx_hash TEXT DEFAULT NULL,

16
流程.txt Normal file
View File

@@ -0,0 +1,16 @@
充值rmq -> listen -> 写数据库
-> 添加到TopupMsgs
-> 传给对应的node server -> node server接收 -> 添加到钱包
-> 记录到钱包数据库
提现/支付rmq -> listen -> 写数据库
-> 添加到WithdrawMsgs
-> 传给对应的node server -> node server接收 -> 记录到数据库
-> 开始转账 -> 校验余额 -> 转账 -> 转账结果返回 -> listen -> 修改相关状态
node server listen -> 新区块产生时读取当前listen server的消息 -> 对比消息和区块中的交易 -> 消息中的to = 区块交易中的to(充值) -> 返回消息 -> listen -> 修改数据库
-> 记录到unconfirmtxs -> 返回rmq -> 发送消息
-> 消息中的from、to、amount = 区块交易中的from、to、amount(提现/充值) -> 返回消息 -> listen -> 修改数据库状态
node server confirm -> 新区块产生同时会读取当前unconfirmtxs数据 -> 对比每个交易的高度 -> 符合确认条件 -> 修改钱包数据
-> 返回消息 -> listen -> 修改相关数据 -> 返回rmq -> 发出消息