diff --git a/README.md b/README.md index ca4123a..411e5c4 100644 --- a/README.md +++ b/README.md @@ -74,118 +74,200 @@ M2Pool Payment System v2 是一个基于以太坊区块链的**分布式支付 ↓ ↑ ┌─────────────────────────────────────────────────────────────┐ │ RabbitMQ │ -│ ┌─────────┐ ┌──────────┐ ┌────────┐ │ -│ │ topup │ │ withdraw │ │ pay │ 请求队列 │ -│ └─────────┘ └──────────┘ └────────┘ │ -│ ┌─────────┐ ┌──────────┐ ┌────────┐ │ -│ │topup_ │ │withdraw_ │ │ pay_ │ 响应队列 │ -│ │ resp │ │ resp │ │ resp │ │ -│ └─────────┘ └──────────┘ └────────┘ │ +│ ┌─────────┐ ┌──────────┐ ┌────────┐ ┌────────┐ │ +│ │ topup │ │ withdraw │ │ pay │ │ remove │ 请求队列 │ +│ └─────────┘ └──────────┘ └────────┘ └────────┘ │ +│ ┌─────────┐ ┌──────────┐ ┌────────┐ ┌────────┐ │ +│ │topup_ │ │withdraw_ │ │ pay_ │ │remove_ │ 响应队列 │ +│ │ resp │ │ resp │ │ resp │ │ resp │ │ +│ └─────────┘ └──────────┘ └────────┘ └────────┘ │ └────────────┬────────────────────────────┬───────────────────┘ │ │ ↓ ↑ ┌─────────────────────────────────────────────────────────────┐ │ M2Pool Payment System v2 │ │ │ -│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ -│ │ RabbitMQ │ │ Blockchain │ │ Database │ │ -│ │ Consumer │─>│ Manager │─>│ (MySQL) │ │ -│ └──────────────┘ └──────┬───────┘ └──────────────┘ │ -│ │ │ -│ │ WebSocket + RPC │ -└────────────────────────────┼─────────────────────────────────┘ - │ - ↓ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ Server │ │ ListenServer │ │ +│ │ (路由层) │────────>│ (消息层) │ │ +│ └──────────────┘ └──────┬───────┘ │ +│ │ │ +│ ┌────────────┴────────────┐ │ +│ ↓ ↓ │ +│ ┌──────────────────┐ ┌──────────────────┐ │ +│ │ Blockchain │ │ Database │ │ +│ │ Manager │ │ (MySQL/SQLite) │ │ +│ │ (ETH Node) │ │ │ │ +│ └────────┬─────────┘ └──────────────────┘ │ +│ │ │ +│ │ WebSocket + RPC │ +└───────────────────┼─────────────────────────────────────────┘ + │ + ↓ ┌─────────────────────────────────────────────────────────────┐ │ 以太坊区块链网络 │ │ │ │ ┌──────────────┐ ┌──────────────┐ │ -│ │ 新区块 │ │ USDT Transfer │ │ -│ │ (NewHead) │ │ 事件 │ │ +│ │ 新区块 │ │ ETH/USDT │ │ +│ │ (NewHead) │ │ Transfer事件 │ │ │ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` +### 消息流转架构 + +``` +业务系统 → RabbitMQ → Server → ListenServer → Blockchain Node + ↑ ↓ + │ (记录到数据库) + │ │ + │ (监听链上事件) + │ │ + │ ←── 返回响应 ─── + │ │ + └─── 发送响应 ───────────┘ +``` + +#### 核心组件说明 + +1. **Server (server.go)**: 服务入口和消息路由 + - 接收 RabbitMQ 消息 + - 验证消息签名 + - 路由消息到 ListenServer + +2. **ListenServer (listen/)**: 消息中间层 + - 管理消息状态(TopupMsgs, WithdrawMsgs, PayMsgs, RemoveMsgs) + - 与区块链节点双向通信 + - 持久化消息到 SQLite + - 转发响应到 RabbitMQ + +3. **Blockchain Node (blockchain/eth/)**: 区块链交互层 + - 监听链上事件(ETH 和 USDT 转账) + - 执行转账交易 + - 管理待确认交易(UnConfirmTxs) + - 确认交易状态 + ### 核心模块 -#### 1. Blockchain Manager (`internal/blockchain/`) -- **blockchain.go**:统一的区块链接口定义 -- **eth/eth.go**:以太坊节点实现 - - 监听 USDT Transfer 事件(实时检测充值) - - 监听新区块产生(触发交易确认) - - 管理待确认交易池(UnConfirmTxs) - - 执行 ERC20 转账(提现/支付) - - 自动重连机制 - - 地址统一小写处理 +#### 1. Server (`internal/server.go`) +- 服务入口和启动管理 +- 消息签名验证 +- 消息路由到 ListenServer +- RabbitMQ 消息处理回调 +- 响应消息转发到 RabbitMQ -#### 2. Message Queue (`internal/queue/`) +#### 2. ListenServer (`internal/listen/`) +- **listen.go**:消息中间层,连接 RabbitMQ 和区块链节点 + - **RmqMsgIn()**:处理来自 RabbitMQ 的消息 + - 充值请求 → 写入数据库 → 添加到 TopupMsgs → 转发到区块链节点 + - 提现请求 → 写入数据库 → 添加到 WithdrawMsgs → 转发到区块链节点 + - 支付请求 → 写入数据库 → 添加到 PayMsgs → 转发到区块链节点 + - 移除监听 → 写入数据库 → 添加到 RemoveMsgs → 转发到区块链节点 + - **NetMsgIn()**:处理来自区块链节点的响应 + - 充值响应 → 更新数据库 → 转发到 RabbitMQ + - 提现响应 → 更新数据库 → 转发到 RabbitMQ + - 支付响应 → 更新数据库 → 转发到 RabbitMQ + - 消息状态管理(TopupMsgs, WithdrawMsgs, PayMsgs, RemoveMsgs) + - 消息查找(根据 address 或 queue_id) + +#### 3. Blockchain Manager (`internal/blockchain/`) +- **blockchain.go**:统一的区块链接口定义和路由 +- **eth/eth.go**:以太坊节点实现 + - **Listen()**:监听链上事件 + - 监听 ETH Transfer 事件 + - 监听 USDT Transfer 事件 + - 监听新区块产生(用于交易确认) + - **ListenMsg()**:监听来自 ListenServer 的消息 + - 充值请求 → 添加到钱包监听 + - 提现请求 → 执行转账 + - 支付请求 → 执行转账 + - 移除监听 → 移除钱包监听 + - **Transfer()**:执行转账交易 + - 管理待确认交易池(UnConfirmTxs) + - 交易确认逻辑 + +#### 4. Message Queue (`internal/queue/`) - **rabbitmq.go**:RabbitMQ 消息队列服务 - - 消费 3 个请求队列(充值/提现/支付) - - 发布 3 个响应队列(交易确认结果) + - 消费 4 个请求队列(充值/提现/支付/移除监听) + - 发布 4 个响应队列(交易确认结果) - 自动重连和错误重试 - 消息持久化 -#### 3. Database (`internal/db/`) -- **db.go**:MySQL 数据库连接池 +#### 5. Database (`internal/db/`) +- **mysql.go**:MySQL 数据库连接池 - 存储钱包私钥(加密) - 连接池管理 - 事务支持 +- **sqlite.go**:SQLite 本地存储 + - 存储消息请求和响应记录 + - 存储未确认交易信息 + - 提供事务支持 -#### 4. Message (`internal/msg/`) +#### 6. Message (`internal/msg/`) - **msg.go**:消息结构定义 - - 请求消息(TopupMsg_req, WithdrawMsg_req, PayMsg_req) - - 响应消息(TopupMsg_resp, WithdrawMsg_resp, PayMsg_resp) - - 配置结构(Config, RMQConfig, ETHConfig) + - 请求消息(TopupMsg_req, WithdrawMsg_req, PayMsg_req, RemoveListenMsg_req) + - 响应消息(TopupMsg_resp, WithdrawMsg_resp, PayMsg_resp, RemoveListenMsg_resp) +- **config.go**:配置结构 + - Config, RmqConfig, ETHConfig, MysqlConfig -#### 5. Utils (`internal/utils/`) +#### 7. Utils (`internal/utils/`) - **utils.go**:工具函数 - 数值转换(BigInt ↔ Float64) - - 哈希计算 - - 加密解密 + - 数组/切片操作 + - 字符串处理 -#### 6. Crypto (`internal/crypto/`) +#### 8. Crypto (`internal/crypto/`) - **crypto.go**:加密工具 - SHA256 哈希 - 签名验证 -#### 7. Server (`internal/server.go`) -- 服务启动和管理 -- 消息路由和处理 -- 优雅关闭 +#### 9. Logger (`internal/logger/`) +- **transaction_logger.go**:交易日志记录 + - 记录所有交易操作的详细日志 ### 项目结构 ``` m2pool-payment-v2/ ├── cmd/ # 主程序入口 -│ └── main.go # 程序入口,解析命令行参数 +│ └── main.go # 程序入口,解析命令行参数 (-key) ├── internal/ # 内部包(不对外暴露) -│ ├── server.go # 服务启动和管理 +│ ├── server.go # 服务启动和管理、消息路由 │ ├── blockchain/ # 区块链交互模块 -│ │ ├── blockchain.go # 统一的区块链接口定义 -│ │ ├── eth/ # 以太坊实现 -│ │ │ └── eth.go # USDT 监听、转账、确认 -│ │ └── tron/ # TRON 实现(待开发) -│ ├── crypto/ # 加密工具 -│ │ └── crypto.go # SHA256、签名验证 +│ │ ├── blockchain.go # 统一的区块链接口定义和路由 +│ │ └── eth/ # 以太坊实现 +│ │ ├── eth.go # ETH节点接口定义 +│ │ └── eth_prv.go # ETH节点实现(监听、转账、确认) +│ ├── listen/ # 消息中间层 +│ │ ├── listen.go # ListenServer 定义和消息查找 +│ │ └── listen_prv.go # ListenServer 实现(消息处理) +│ ├── queue/ # 消息队列 +│ │ └── rabbitmq.go # RabbitMQ 客户端封装 │ ├── db/ # 数据库 -│ │ ├── db.go # MySQL 连接池管理 +│ │ ├── mysql.go # MySQL 连接池管理 │ │ └── sqlite.go # SQLite 本地存储 │ ├── msg/ # 消息定义 -│ │ └── msg.go # 请求/响应结构体定义 -│ ├── queue/ # 消息队列 -│ │ ├── rabbitmq.go # RabbitMQ 客户端封装 -│ │ └── README.md # RabbitMQ 使用文档 +│ │ ├── msg.go # 请求/响应结构体定义 +│ │ └── config.go # 配置结构定义 +│ ├── crypto/ # 加密工具 +│ │ └── crypto.go # SHA256、签名验证 │ ├── logger/ # 日志记录 │ │ └── transaction_logger.go # 交易日志 +│ ├── constant/ # 常量定义 +│ │ └── constant.go # 状态码等常量 │ └── utils/ # 工具函数 │ └── utils.go # 类型转换、格式化 ├── public/ # 公共资源 -│ ├── SQLite3.sql # SQLite 表结构 -│ └── migration.sql # 数据库迁移脚本 +│ ├── eth.sql # ETH钱包表结构(MySQL) +│ ├── msg.sql # 消息表结构(SQLite) +│ └── eth_mysql.sql # MySQL 完整建表脚本 +├── bin/ # 构建和启动脚本 +│ ├── build.sh # 编译脚本 +│ └── start.sh # 启动脚本 ├── test/ # 测试和示例 │ ├── test.go # 测试程序(独立运行) -│ └── config.json # 配置文件 +│ └── config.json # 配置文件示例 +├── 流程.txt # 业务流程说明文档 ├── go.mod # Go 模块定义 ├── go.sum # 依赖版本锁定 └── README.md # 项目文档(本文件) @@ -256,57 +338,155 @@ e.WsClient.SubscribeNewHead(e.Ctx, headers) ### 1. 充值功能 💰 +**完整流程:** ``` -用户转账 → 实时检测 → 待确认通知 → 区块确认 → 最终通知 +业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到TopupMsgs + ↓ + Blockchain Node → 添加到钱包 → 记录到钱包数据库 + ↓ + 新区块产生 → 监听链上事件 → 对比交易 + ↓ + 消息中的to = 区块交易中的to(充值) + ↓ + 返回消息 → ListenServer → 修改数据库 + ↓ + 记录到UnConfirmTxs → 返回RabbitMQ → 发送待确认消息(status=2) + ↓ + 新区块产生 → 读取UnConfirmTxs → 对比交易高度 + ↓ + 符合确认条件(当前高度 >= 交易高度 + 确认高度) + ↓ + 修改钱包数据 → 返回消息 → ListenServer → 修改数据库 + ↓ + 返回RabbitMQ → 发送最终确认消息(status=1/0) ``` **特点:** - ✅ 实时检测到账 -- ✅ 发送**两次**通知:待确认 + 最终确认 -- ✅ 支持多币种(当前支持 USDT) +- ✅ 发送**两次**通知:待确认(status=2) + 最终确认(status=1/0) +- ✅ 支持 ETH 和 USDT - ✅ 自动地址监听管理 +- ✅ 钱包余额自动更新 **消息流:** -1. 业务系统发送充值请求 → RabbitMQ -2. 系统添加地址监听 -3. 用户转账 → 立即通知(status=2 待确认) -4. 等待 20 个区块 → 最终通知(status=1 成功 / 0 失败) +1. **业务系统发送充值请求** → RabbitMQ `topup.queue` +2. **Server 验证签名** → 转发到 ListenServer +3. **ListenServer** → 写入 SQLite 数据库 → 添加到 TopupMsgs → 转发到 Blockchain Node +4. **Blockchain Node** → 添加地址到钱包监听 → 写入 MySQL 钱包数据库 +5. **监听链上事件** → 检测到充值交易 → 匹配地址 → 返回 TopupMsg_resp(status=2)→ 记录到 UnConfirmTxs +6. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `topup_resp.queue`(**第一次通知**) +7. **新区块确认** → 达到确认高度 → 更新钱包余额 → 返回 TopupMsg_resp(status=1/0) +8. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `topup_resp.queue`(**第二次通知**) ### 2. 提现功能 💸 +**完整流程:** ``` -提现请求 → 验证余额 → 发送交易 → 等待确认 → 返回结果 +业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到WithdrawMsgs + ↓ + Blockchain Node → 记录到数据库 + ↓ + 开始转账 → 校验余额 → 发送交易 + ↓ + 转账结果返回 → ListenServer → 修改相关状态 + ↓ + 新区块产生 → 监听链上事件 → 对比交易 + ↓ + 消息中的from、to、amount = 区块交易中的from、to、amount + ↓ + 返回消息 → ListenServer → 修改数据库状态 + ↓ + 记录到UnConfirmTxs → 返回RabbitMQ → 发送消息(status=2) + ↓ + 新区块产生 → 读取UnConfirmTxs → 对比交易高度 + ↓ + 符合确认条件(当前高度 >= 交易高度 + 确认高度) + ↓ + 修改钱包数据 → 返回消息 → ListenServer → 修改数据库 + ↓ + 返回RabbitMQ → 发送最终确认消息(status=1/0) ``` **特点:** - ✅ 自动余额检查 - ✅ 余额不足时使用归集钱包 -- ✅ 发送**一次**通知:最终确认 +- ✅ 发送**两次**通知:待确认(status=2) + 最终确认(status=1/0) - ✅ Gas 费用检查 +- ✅ 支持 ETH 和 USDT **消息流:** -1. 业务系统发送提现请求 → RabbitMQ -2. 系统验证余额并发送交易 -3. 等待 20 个区块确认 -4. 返回结果(status=1 成功 / 0 失败) +1. **业务系统发送提现请求** → RabbitMQ `withdraw.queue` +2. **Server 验证签名** → 转发到 ListenServer +3. **ListenServer** → 写入 SQLite 数据库 → 添加到 WithdrawMsgs → 转发到 Blockchain Node +4. **Blockchain Node** → 验证余额 → 发送交易 → 记录到数据库 → 返回 WithdrawMsg_resp(status=2) +5. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `withdraw_resp.queue`(**第一次通知**) +6. **监听链上事件** → 检测到提现交易 → 匹配交易 → 记录到 UnConfirmTxs +7. **新区块确认** → 达到确认高度 → 更新钱包余额 → 返回 WithdrawMsg_resp(status=1/0) +8. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `withdraw_resp.queue`(**第二次通知**) ### 3. 支付功能 💳 +**完整流程:** ``` -支付请求 → 验证余额 → 发送交易 → 等待确认 → 返回结果 +业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到PayMsgs + ↓ + Blockchain Node → 记录到数据库 + ↓ + 开始转账 → 校验余额 → 发送交易 + ↓ + 转账结果返回 → ListenServer → 修改相关状态 + ↓ + 新区块产生 → 监听链上事件 → 对比交易 + ↓ + 消息中的from、to、amount = 区块交易中的from、to、amount + ↓ + 返回消息 → ListenServer → 修改数据库状态 + ↓ + 记录到UnConfirmTxs → 返回RabbitMQ → 发送消息(status=2) + ↓ + 新区块产生 → 读取UnConfirmTxs → 对比交易高度 + ↓ + 符合确认条件(当前高度 >= 交易高度 + 确认高度) + ↓ + 修改钱包数据 → 返回消息 → ListenServer → 修改数据库 + ↓ + 返回RabbitMQ → 发送最终确认消息(status=1/0) ``` **特点:** -- ✅ 订单关联 +- ✅ 支持批量支付(一次请求多笔转账) - ✅ 自动余额检查 -- ✅ 发送**一次**通知:最终确认 -- ✅ 支持商户收款 +- ✅ 发送**两次**通知:待确认(status=2) + 最终确认(status=1/0) +- ✅ Gas 费用检查 +- ✅ 支持 ETH 和 USDT **消息流:** -1. 业务系统发送支付请求(含订单ID)→ RabbitMQ -2. 系统验证余额并发送交易 -3. 等待 20 个区块确认 -4. 返回结果(status=1 成功 / 0 失败) +1. **业务系统发送支付请求** → RabbitMQ `pay.queue`(可包含多笔交易) +2. **Server 验证签名** → 转发到 ListenServer +3. **ListenServer** → 写入 SQLite 数据库 → 添加到 PayMsgs → 转发到 Blockchain Node +4. **Blockchain Node** → 验证余额 → 逐个发送交易 → 记录到数据库 → 返回 PayData(status=2) +5. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `pay_resp.queue`(**第一次通知**) +6. **监听链上事件** → 检测到支付交易 → 匹配交易 → 记录到 UnConfirmTxs +7. **新区块确认** → 达到确认高度 → 更新钱包余额 → 返回 PayData(status=1/0) +8. **ListenServer** → 更新数据库 → 发送到 RabbitMQ `pay_resp.queue`(**第二次通知**) + +### 4. 移除监听功能 🗑️ + +**完整流程:** +``` +业务系统 → RabbitMQ → Server → ListenServer → 写数据库 → 添加到RemoveMsgs + ↓ + Blockchain Node → 移除钱包监听 + ↓ + 返回消息 → ListenServer → 修改数据库 + ↓ + 返回RabbitMQ → 发送响应消息(status=1/0) +``` + +**特点:** +- ✅ 支持移除充值监听 +- ✅ 支持移除提现/支付监听 +- ✅ 实时生效 --- @@ -355,16 +535,25 @@ cd test cp config.json config.json.backup # 编辑 config.json,填入实际配置 -# 5. 编译主程序 -cd .. +# 5. 准备配置文件 +cp bin/config.json ./config.json +# 编辑 config.json,填入实际配置 + +# 6. 初始化数据库 +# MySQL: 执行 public/eth_mysql.sql +# SQLite: 程序会自动创建(如果不存在) + +# 7. 编译主程序 go build -o m2pool-payment cmd/main.go -# 6. 运行(指定通信密钥) +# 8. 运行(指定通信密钥) ./m2pool-payment -key=your_secret_key -# 或者运行测试程序 -cd test -go run test.go +# 或者使用启动脚本 +cd bin +chmod +x build.sh start.sh +./build.sh +./start.sh your_secret_key ``` ### 配置文件示例 @@ -373,6 +562,7 @@ go run test.go ```json { + "net": ["ETH"], "rmq_config": { "sub_addr": "amqp://m2pool:m2pool@localhost:5672", "pay": { @@ -390,6 +580,11 @@ go run test.go "exchange": "pay.exchange", "routing": ["pay.withdraw.routing.key"] }, + "remove": { + "queue": "pay.remove.queue", + "exchange": "pay.exchange", + "routing": ["pay.remove.routing.key"] + }, "pay_resp": { "queue": "pay.auto.return.queue", "exchange": "pay.exchange", @@ -404,22 +599,33 @@ go run test.go "queue": "pay.withdraw.return.queue", "exchange": "pay.exchange", "routing": ["pay.withdraw.return.routing.key"] + }, + "remove_resp": { + "queue": "pay.remove.return.queue", + "exchange": "pay.exchange", + "routing": ["pay.remove.return.routing.key"] } }, - "eth_config": { - "rpcUrl": "http://localhost:8545", - "wsUrl": "ws://localhost:8546", - "confirmHeight": 20, - "dbConfig": { + "msg_config": { + "sqlite_path": "./msg.db" + }, + "mysql_config": { + "wallet": { "user": "root", "password": "your_password", "host": "127.0.0.1", "port": 3306, "database": "payment", "maxOpenConns": 20, - "maxIdleCoons": 20, - "connMaxLife": 120 + "maxIdleConns": 10, + "connMaxLife": 120000000000 } + }, + "eth_config": { + "rpc_url": "http://localhost:8545", + "ws_url": "ws://localhost:8546", + "confirm_height": 20, + "sqlite_path": "./eth.db" } } ``` @@ -580,19 +786,23 @@ go run test.go ## 常见问题 -### Q1: 为什么充值会收到两次通知? +### Q1: 为什么所有功能都会收到两次通知? **A:** 这是设计特性! -- **第一次**(status=2):检测到交易,提醒用户"正在确认" +- **第一次**(status=2):检测到交易/开始处理,提醒用户"正在确认" - **第二次**(status=1/0):交易确认,通知最终结果 业务系统应该: -- status=2:显示进度,**不增加余额** -- status=1:增加余额 +- status=2:显示进度,**不修改余额或订单状态** +- status=1:增加/减少余额,更新订单状态 +- status=0:交易失败,不修改余额,提示错误 -### Q2: 提现/支付为什么只有一次通知? +### Q2: 为什么需要两次通知? -**A:** 因为是系统主动发起的交易,用户已经知道在处理中,不需要额外的待确认通知。 +**A:** +1. **实时反馈**:用户能立即知道交易已被检测到 +2. **状态追踪**:业务系统可以追踪交易从待确认到最终确认的完整流程 +3. **用户体验**:提供更好的用户交互体验 ### Q3: 如何处理交易失败? @@ -965,38 +1175,121 @@ type Metrics struct { --- +## 业务流程详解 + +根据 `流程.txt` 文档,系统的完整业务流程如下: + +### 充值流程 + +1. **RabbitMQ → ListenServer** + - 接收充值请求 + - 写入 SQLite 数据库(`topup_req_msg` 表) + - 添加到 TopupMsgs 内存映射 + +2. **ListenServer → Blockchain Node** + - 转发充值请求到区块链节点 + - 区块链节点接收后: + - 添加地址到钱包监听 + - 记录到 MySQL 钱包数据库(`ETH_wallets` 表) + +3. **Blockchain Node → ListenServer(监听阶段)** + - 新区块产生时,监听链上交易 + - 对比消息中的 `to` 地址和区块交易中的 `to` 地址 + - 如果匹配(充值检测): + - 返回 TopupMsg_resp(status=2 待确认) + - 记录到 UnConfirmTxs(待确认交易池) + - ListenServer 接收后: + - 修改数据库状态 + - 返回 RabbitMQ → 发送第一次通知(status=2) + +4. **Blockchain Node → ListenServer(确认阶段)** + - 新区块产生时,读取 UnConfirmTxs 数据 + - 对比每个交易的高度:`当前高度 >= 交易高度 + 确认高度` + - 符合确认条件后: + - 修改钱包数据(更新余额) + - 返回 TopupMsg_resp(status=1 成功 / 0 失败) + - ListenServer 接收后: + - 修改数据库状态 + - 返回 RabbitMQ → 发送第二次通知(status=1/0) + +### 提现/支付流程 + +1. **RabbitMQ → ListenServer** + - 接收提现/支付请求 + - 写入 SQLite 数据库(`withdraw_req_msg` 或 `pay_req_msg` 表) + - 添加到 WithdrawMsgs 或 PayMsgs 内存映射 + +2. **ListenServer → Blockchain Node** + - 转发提现/支付请求到区块链节点 + - 区块链节点接收后: + - 记录到数据库 + - 开始转账流程: + - 校验余额 + - 发送交易 + - 返回响应(status=2 待确认) + - ListenServer 接收后: + - 修改相关状态 + - 返回 RabbitMQ → 发送第一次通知(status=2) + +3. **Blockchain Node → ListenServer(监听阶段)** + - 新区块产生时,监听链上交易 + - 对比消息中的 `from`、`to`、`amount` 和区块交易中的对应字段 + - 如果匹配(提现/支付检测): + - 返回响应(status=2 待确认) + - 记录到 UnConfirmTxs + - ListenServer 接收后: + - 修改数据库状态 + - 返回 RabbitMQ → 发送第一次通知(status=2) + +4. **Blockchain Node → ListenServer(确认阶段)** + - 新区块产生时,读取 UnConfirmTxs 数据 + - 对比每个交易的高度:`当前高度 >= 交易高度 + 确认高度` + - 符合确认条件后: + - 修改钱包数据(更新余额) + - 返回响应(status=1 成功 / 0 失败) + - ListenServer 接收后: + - 修改数据库状态 + - 返回 RabbitMQ → 发送第二次通知(status=1/0) + ## 重要修复说明 ### 🔧 已修复的问题 -#### 1. QueueId 重复问题 +#### 1. 服务启动问题 -**问题**:两笔不同的交易出现相同的 QueueId - -**原因**:数据库表主键设计错误,使用 `(from_addr, to_addr)` 作为主键 +**问题**:部分关键服务未启动 **修复**: -- 修改数据库表结构,将主键改为 `queueId` -- 创建数据库迁移脚本 `public/migration.sql` -- 修复 SQL 插入语句的参数数量不匹配问题 +- 添加 `rmqServer.Start()` 启动 RabbitMQ 服务 +- 添加 `messageServer.RmqMsgIn()` 启动消息接收监听 +- 添加 `messageServer.NetMsgIn()` 启动网络消息监听 +- 添加 `blockChainServer.Listen()` 启动区块链监听 +- 添加 `blockChainServer.ListenMsg()` 启动区块链消息监听 -#### 2. 重复发送响应问题 +#### 2. 错误处理优化 -**问题**:提现和支付会发送两次响应 - -**原因**:转账失败时,先发送失败响应,然后仍然进入链上确认流程 +**问题**:使用 `log.Fatalf` 导致程序非正常退出 **修复**: -- 在转账失败时添加 `return` 语句 -- 确保转账失败时不进入链上确认流程 -- 只有转账成功才会进入链上监听和确认 +- 将非关键错误的 `log.Fatalf` 改为 `log.Printf` +- 添加错误恢复机制,确保程序稳定运行 -### 📊 修复后的消息发送次数 +#### 3. 变量和字段名称修复 -| 场景 | 消息处理阶段 | 链上确认阶段 | 总发送次数 | -|------|-------------|-------------|-----------| -| **转账失败** | 发送失败响应 | 不进入(已return) | **1次** | -| **转账成功** | 不发送响应 | 发送成功响应 | **1次** | +**问题**:部分变量和字段名称存在拼写错误 + +**修复**: +- 修复 `Heihgt` → `Height` +- 修复数据库字段名称不匹配问题 +- 修复 mutex 解锁错误 + +#### 4. 逻辑错误修复 + +**问题**:交易确认高度判断逻辑错误 + +**修复**: +- 修复确认高度判断条件:`now_height >= tx.Height+e.ConfirmHeight` +- 修复变量重复声明问题 --- diff --git a/go.mod b/go.mod index 132084e..70f7b8b 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/go-ole/go-ole v1.3.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/holiman/uint256 v1.3.2 // indirect - github.com/mitchellh/mapstructure v1.5.0 + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/rabbitmq/amqp091-go v1.10.0 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect diff --git a/internal/blockchain/eth/eth_prv.go b/internal/blockchain/eth/eth_prv.go index ff8b38f..94946aa 100644 --- a/internal/blockchain/eth/eth_prv.go +++ b/internal/blockchain/eth/eth_prv.go @@ -391,18 +391,19 @@ func (e *ETHNode) loadWallets() error { if err := rows.Err(); err != nil { return fmt.Errorf("error occurred while iterating rows: %v", err) } - pks, err := e.getAddressesPks(addresses) - if err != nil { - return fmt.Errorf("inital balance private key error: %v", err) + if len(addresses) > 0 { + pks, err := e.getAddressesPks(addresses) + if err != nil { + return fmt.Errorf("inital balance private key error: %v", err) + } + e.mu.Lock() + e.Wallets = wallets + for address, pk := range pks { + e.Wallets[address].pk = pk + } + e.mu.Unlock() } - e.mu.Lock() - e.Wallets = wallets - for address, pk := range pks { - e.Wallets[address].pk = pk - } - e.mu.Unlock() - return nil } @@ -1090,6 +1091,13 @@ func (e *ETHNode) handleListen_Topup_req(msg message.TopupMsg_req) { pk, err := e.getAddressPk(msg.Address) if err != nil { log.Printf("Query balance(%s-%s) private_key error: %v", msg.Chain, msg.Address, err) + go e.asyncSendMsgToListen(message.TopupMsg_resp{ + QueueId: msg.QueueId, + Chain: msg.Chain, + Symbol: msg.Symbol, + Address: msg.Address, + Status: msg.Status, + }, 3, 5*time.Second) return } // 添加到钱包 @@ -1226,8 +1234,9 @@ func (e *ETHNode) handleListen_Pay_req(msg message.PayMsg_req) { if err != nil { log.Printf("check balance error: %v", err) result_msg.PayStatus = constant.STATUS_ERROR - for _, tx := range result_msg.Transactions { + for to, tx := range result_msg.Transactions { tx.Status = constant.STATUS_ERROR + result_msg.Transactions[to] = tx } go e.asyncSendMsgToListen(result_msg, 3, 5*time.Second) return diff --git a/internal/listen/listen_prv.go b/internal/listen/listen_prv.go index 800c940..53cebae 100644 --- a/internal/listen/listen_prv.go +++ b/internal/listen/listen_prv.go @@ -96,7 +96,36 @@ func (l *ListenServer) handleRmqRemove_req(msg message.RemoveListenMsg_req) { // 充值响应 func (l *ListenServer) handleChainTopup_resp(msg message.TopupMsg_resp) { - + switch msg.Status { + case constant.STATUS_SUCCESS, constant.STATUS_FAILED: + // 修改数据库 + str := "UPDATE topup_resp_msg SET status = ? WHERE tx_hash = ?" + params := []any{msg.Status, msg.TxHash} + count, err := l.SqliteDB.Update(str, params) + if err != nil { + // 更详细的错误日志,包括 QueueId 和 Status + log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) + } else if count != 1 { + // 如果更新的行数不是 1,日志中记录详细信息 + log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) + } + case constant.STATUS_PENDING: + str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, from_addr, to_addr, amount, tx_hash, height, status) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.FromAddress, msg.Address, msg.Amount, msg.TxHash, msg.BlockHeight, msg.Status} + err := l.SqliteDB.Insert(str, params) + if err != nil { + log.Printf("Insert Topup_resp msg error: %v", err) + } + default: + // 插入数据库 + str := "INSERT INTO topup_resp_msg (queue_id, chain, symbol, to_addr, status) VALUES (?,?,?,?,?,?,?,?,?)" + params := []any{msg.QueueId, msg.Chain, msg.Symbol, msg.Address, msg.Status} + err := l.SqliteDB.Insert(str, params) + if err != nil { + log.Printf("Insert Topup_resp msg error: %v", err) + } + } + go l.asyncSendMsgToRmq(msg, 3, 5*time.Second) } // 提现响应 @@ -121,8 +150,12 @@ func (l *ListenServer) handleChainWithdraw_resp(msg message.WithdrawMsg_resp) { str := "UPDATE withdraw_resp_msg SET status = ? WHERE tx_hash = ?" params := []any{msg.Status, msg.TxHash} count, err := l.SqliteDB.Update(str, params) - if err != nil || count != 1 { - log.Printf("count=%d, err=%v", count, err) + if err != nil { + // 更详细的错误日志,包括 QueueId 和 Status + log.Printf("Failed to update remove_resp_msg for queue_id %s: %v", msg.QueueId, err) + } else if count != 1 { + // 如果更新的行数不是 1,日志中记录详细信息 + log.Printf("Unexpected update count for queue_id %s: expected 1, got %d", msg.QueueId, count) } }() go l.asyncSendMsgToRmq(msg, 3, 5*time.Second) diff --git a/public/eth.sql b/public/eth.sql index 3e0efa4..5b6473e 100644 --- a/public/eth.sql +++ b/public/eth.sql @@ -3,7 +3,7 @@ CREATE TABLE IF NOT EXISTS ETH_wallets ( queue_id TEXT NOT NULL, timestamp INTEGER NOT NULL, sign TEXT NOT NULL, - status INTEGER DEFAULT 0, -- 0未在监听 1正在监听 + status INTEGER DEFAULT 0 -- 0未在监听 1正在监听 ); CREATE TABLE IF NOT EXISTS ETH_balances ( diff --git a/public/msg.sql b/public/msg.sql index c52e3b7..da5df27 100644 --- a/public/msg.sql +++ b/public/msg.sql @@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS topup_resp_msg ( queue_id TEXT NOT NULL, chain TEXT NOT NULL, symbol TEXT NOT NULL, - from_addr TEXT NOT NULL, + from_addr TEXT DEFAULT NULL, to_addr TEXT NOT NULL, amount TEXT NOT NULL, -- 改为 TEXT 类型 tx_hash TEXT DEFAULT NULL, diff --git a/流程.txt b/流程.txt new file mode 100644 index 0000000..dd25690 --- /dev/null +++ b/流程.txt @@ -0,0 +1,16 @@ +充值:rmq -> listen -> 写数据库 + -> 添加到TopupMsgs + -> 传给对应的node server -> node server接收 -> 添加到钱包 + -> 记录到钱包数据库 + +提现/支付:rmq -> listen -> 写数据库 + -> 添加到WithdrawMsgs + -> 传给对应的node server -> node server接收 -> 记录到数据库 + -> 开始转账 -> 校验余额 -> 转账 -> 转账结果返回 -> listen -> 修改相关状态 + +node server listen -> 新区块产生时读取当前listen server的消息 -> 对比消息和区块中的交易 -> 消息中的to = 区块交易中的to(充值) -> 返回消息 -> listen -> 修改数据库 + -> 记录到unconfirmtxs -> 返回rmq -> 发送消息 + -> 消息中的from、to、amount = 区块交易中的from、to、amount(提现/充值) -> 返回消息 -> listen -> 修改数据库状态 + +node server confirm -> 新区块产生同时会读取当前unconfirmtxs数据 -> 对比每个交易的高度 -> 符合确认条件 -> 修改钱包数据 + -> 返回消息 -> listen -> 修改相关数据 -> 返回rmq -> 发出消息 \ No newline at end of file