diff --git a/README.md b/README.md index b42eafa..61b53c2 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,19 @@
-**基于区块链的分布式支付系统** +**基于以太坊区块链的分布式支付系统** [![Go Version](https://img.shields.io/badge/Go-1.24+-00ADD8?style=flat&logo=go)](https://golang.org) [![Ethereum](https://img.shields.io/badge/Ethereum-Mainnet-3C3C3D?style=flat&logo=ethereum)](https://ethereum.org) [![RabbitMQ](https://img.shields.io/badge/RabbitMQ-3.x-FF6600?style=flat&logo=rabbitmq)](https://www.rabbitmq.com) [![MySQL](https://img.shields.io/badge/MySQL-8.0+-4479A1?style=flat&logo=mysql)](https://www.mysql.com) +[![License](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE) 支持 **充值**、**提现**、**支付** 三大核心功能,实时监听链上交易,自动确认到账。 -[快速开始](#快速开始) • [功能特性](#功能特性) • [架构设计](#架构设计) • [API 文档](#api-文档) +基于 Go 1.24 + Ethereum + RabbitMQ + MySQL 技术栈构建的企业级支付解决方案。 + +[快速开始](#快速开始) • [功能特性](#功能特性) • [架构设计](#架构设计) • [配置说明](#配置说明)
@@ -170,35 +173,51 @@ M2Pool Payment System v2 是一个基于以太坊区块链的**分布式支付 ### 核心模块 -#### 1. Blockchain Manager (blockchain/) -- **eth/eth.go**:以太坊节点交互 - - 监听 USDT Transfer 事件 - - 监听新区块产生 - - 管理待确认交易池 - - 执行 ERC20 转账 - -#### 2. Message Queue (queue/) -- **rabbitmq.go**:消息队列服务 - - 消费充值/提现/支付请求 - - 发布交易确认响应 +#### 1. Blockchain Manager (`internal/blockchain/`) +- **blockchain.go**:统一的区块链接口定义 +- **eth/eth.go**:以太坊节点实现 + - 监听 USDT Transfer 事件(实时检测充值) + - 监听新区块产生(触发交易确认) + - 管理待确认交易池(UnConfirmTxs) + - 执行 ERC20 转账(提现/支付) - 自动重连机制 + - 地址统一小写处理 -#### 3. Database (db/) -- **db.go**:数据库连接池 +#### 2. Message Queue (`internal/queue/`) +- **rabbitmq.go**:RabbitMQ 消息队列服务 + - 消费 3 个请求队列(充值/提现/支付) + - 发布 3 个响应队列(交易确认结果) + - 自动重连和错误重试 + - 消息持久化 + +#### 3. Database (`internal/db/`) +- **db.go**:MySQL 数据库连接池 - 存储钱包私钥(加密) - - 存储交易记录 + - 连接池管理 + - 事务支持 -#### 4. Message (msg/) +#### 4. Message (`internal/msg/`) - **msg.go**:消息结构定义 - - 请求消息结构 - - 响应消息结构 - - 配置结构 + - 请求消息(TopupMsg_req, WithdrawMsg_req, PayMsg_req) + - 响应消息(TopupMsg_resp, WithdrawMsg_resp, PayMsg_resp) + - 配置结构(Config, RMQConfig, ETHConfig) -#### 5. Utils (utils/) +#### 5. Utils (`internal/utils/`) - **utils.go**:工具函数 - - 数值转换 + - 数值转换(BigInt ↔ Float64) + - 哈希计算 - 加密解密 +#### 6. Crypto (`internal/crypto/`) +- **crypto.go**:加密工具 + - SHA256 哈希 + - 签名验证 + +#### 7. Server (`internal/server.go`) +- 服务启动和管理 +- 消息路由和处理 +- 优雅关闭 + --- ## 快速开始 @@ -210,7 +229,7 @@ M2Pool Payment System v2 是一个基于以太坊区块链的**分布式支付 - ✅ RabbitMQ 3.x+ - ✅ 以太坊节点(支持 WebSocket 和 RPC) -### 安装 +### 安装步骤 ```bash # 1. 克隆项目 @@ -221,23 +240,71 @@ cd m2pool-payment-v2 go mod download # 3. 创建数据库 -mysql -u root -p < schema.sql +mysql -u root -p +``` +```sql +CREATE DATABASE payment CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; +USE payment; + +-- 创建钱包余额表 +CREATE TABLE `eth_balance` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `address` VARCHAR(42) NOT NULL UNIQUE, + `private_key` VARCHAR(255) NOT NULL COMMENT '加密后的私钥', + `balance` DECIMAL(20, 8) DEFAULT 0, + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX `idx_address` (`address`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='ETH钱包表'; +``` + +```bash # 4. 配置文件 -cp test/config.example.json test/config.json +cd test +cp config.json config.json.backup # 编辑 config.json,填入实际配置 -# 5. 编译 +# 5. 编译主程序 +cd .. go build -o m2pool-payment cmd/main.go -# 6. 运行 -./m2pool-payment +# 6. 运行(指定通信密钥) +./m2pool-payment -key=your_secret_key -# 或者直接运行测试 +# 或者运行测试程序 cd test go run test.go ``` +### 配置文件示例 + +创建 `test/config.json`: + +```json +{ + "rmq_config": { + "sub_addr": "amqp://m2pool:m2pool@localhost:5672" + // ... 其他配置见下文 + }, + "eth_config": { + "rpcUrl": "http://localhost:8545", + "wsUrl": "ws://localhost:8546", + "confirmHeight": 20, + "dbConfig": { + "user": "root", + "password": "your_password", + "host": "127.0.0.1", + "port": 3306, + "database": "payment", + "maxOpenConns": 20, + "maxIdleCoons": 20, + "connMaxLife": 120 + } + } +} +``` + --- ## 配置说明 @@ -544,97 +611,179 @@ go run test.go ``` m2pool-payment-v2/ ├── cmd/ # 主程序入口 -│ └── main.go -├── internal/ # 内部包 -│ ├── blockchain/ # 区块链交互 -│ │ ├── blockchain.go # 统一接口 -│ │ ├── eth/ # 以太坊实现 -│ │ │ └── eth.go -│ │ └── tron/ # TRON实现(待开发) -│ ├── crypto/ # 加密工具 -│ │ └── crypto.go -│ ├── db/ # 数据库 -│ │ └── db.go -│ ├── msg/ # 消息定义 -│ │ └── msg.go -│ ├── queue/ # 消息队列 -│ │ ├── rabbitmq.go -│ │ └── README.md -│ └── utils/ # 工具函数 -│ └── utils.go -├── test/ # 测试和示例 -│ ├── test.go -│ └── config.json -├── go.mod -├── go.sum -└── README.md +│ └── main.go # 程序入口,解析命令行参数 +├── internal/ # 内部包(不对外暴露) +│ ├── server.go # 服务启动和管理 +│ ├── blockchain/ # 区块链交互模块 +│ │ ├── blockchain.go # 统一的区块链接口定义 +│ │ ├── eth/ # 以太坊实现 +│ │ │ └── eth.go # USDT 监听、转账、确认 +│ │ └── tron/ # TRON 实现(待开发) +│ ├── crypto/ # 加密工具 +│ │ └── crypto.go # SHA256、签名验证 +│ ├── db/ # 数据库 +│ │ └── db.go # MySQL 连接池管理 +│ ├── msg/ # 消息定义 +│ │ └── msg.go # 请求/响应结构体定义 +│ ├── queue/ # 消息队列 +│ │ ├── rabbitmq.go # RabbitMQ 客户端封装 +│ │ └── README.md # RabbitMQ 使用文档 +│ └── utils/ # 工具函数 +│ └── utils.go # 类型转换、格式化 +├── test/ # 测试和示例 +│ ├── test.go # 测试程序(独立运行) +│ └── config.json # 配置文件 +├── go.mod # Go 模块定义 +├── go.sum # 依赖版本锁定 +└── README.md # 项目文档(本文件) ``` +### 代码统计 + +| 模块 | 文件 | 代码行数 | 说明 | +|------|------|---------|------| +| **eth** | eth.go | ~700 | 以太坊核心逻辑 | +| **queue** | rabbitmq.go | ~350 | RabbitMQ 封装 | +| **server** | server.go | ~300 | 服务管理 | +| **msg** | msg.go | ~130 | 消息定义 | +| **blockchain** | blockchain.go | ~70 | 接口定义 | +| **其他** | - | ~200 | 工具、数据库等 | +| **总计** | - | **~1750** | - | + ### 开发环境设置 +#### 方式一:Docker 快速启动(推荐) + ```bash -# 1. 安装 Go -wget https://go.dev/dl/go1.24.0.linux-amd64.tar.gz -sudo tar -C /usr/local -xzf go1.24.0.linux-amd64.tar.gz - -# 2. 设置环境变量 -export PATH=$PATH:/usr/local/go/bin -export GOPATH=$HOME/go - -# 3. 启动 MySQL +# 1. 启动 MySQL docker run -d \ - --name mysql \ + --name m2pool-mysql \ -p 3306:3306 \ - -e MYSQL_ROOT_PASSWORD=your_password \ + -e MYSQL_ROOT_PASSWORD=Lzx2021@! \ -e MYSQL_DATABASE=payment \ + -v mysql_data:/var/lib/mysql \ mysql:8.0 -# 4. 启动 RabbitMQ +# 2. 启动 RabbitMQ docker run -d \ - --name rabbitmq \ + --name m2pool-rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=m2pool \ -e RABBITMQ_DEFAULT_PASS=m2pool \ + -v rabbitmq_data:/var/lib/rabbitmq \ rabbitmq:3-management -# 5. 连接以太坊节点 -# 使用 Infura、Alchemy 或自建节点 +# 3. 访问 RabbitMQ 管理界面 +# http://localhost:15672 +# 用户名: m2pool +# 密码: m2pool +``` + +#### 方式二:本地安装 + +```bash +# Ubuntu/Debian +sudo apt update +sudo apt install golang-1.24 mysql-server rabbitmq-server + +# CentOS/RHEL +sudo yum install golang mysql-server rabbitmq-server + +# macOS +brew install go mysql rabbitmq +``` + +#### 以太坊节点选择 + +**选项 1:使用公共节点服务(推荐)** +```bash +# Infura(免费层每天 100,000 请求) +RPC: https://mainnet.infura.io/v3/YOUR_API_KEY +WS: wss://mainnet.infura.io/ws/v3/YOUR_API_KEY + +# Alchemy(免费层每秒 25 请求) +RPC: https://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY +WS: wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY +``` + +**选项 2:自建节点** +```bash +# 使用 Geth +geth --http --http.addr 0.0.0.0 --http.port 8545 \ + --ws --ws.addr 0.0.0.0 --ws.port 8546 \ + --http.api eth,net,web3 --ws.api eth,net,web3 + +# 或使用 Erigon(更轻量) +erigon --http --ws +``` + +**选项 3:测试网络** +```bash +# Goerli 测试网(免费) +RPC: https://goerli.infura.io/v3/YOUR_API_KEY +WS: wss://goerli.infura.io/ws/v3/YOUR_API_KEY + +# Sepolia 测试网(推荐) +RPC: https://sepolia.infura.io/v3/YOUR_API_KEY +WS: wss://sepolia.infura.io/ws/v3/YOUR_API_KEY ``` ### 数据库表结构 ```sql --- 钱包余额表 +-- 钱包余额表(必须) CREATE TABLE `eth_balance` ( `id` INT AUTO_INCREMENT PRIMARY KEY, - `address` VARCHAR(42) NOT NULL UNIQUE, - `private_key` VARCHAR(255) NOT NULL, - `balance` DECIMAL(20, 8) DEFAULT 0, + `address` VARCHAR(42) NOT NULL UNIQUE COMMENT '钱包地址(小写)', + `private_key` VARCHAR(255) NOT NULL COMMENT '加密后的私钥', + `balance` DECIMAL(20, 8) DEFAULT 0 COMMENT 'USDT余额', + `eth_balance` DECIMAL(20, 18) DEFAULT 0 COMMENT 'ETH余额(用于Gas)', `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX `idx_address` (`address`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='ETH钱包表'; --- 交易记录表(可选) +-- 插入归集钱包示例 +INSERT INTO `eth_balance` (`address`, `private_key`, `balance`, `eth_balance`) +VALUES ('归集钱包', 'encrypted_private_key_here', 10000.00, 1.0); + +-- 交易记录表(可选,用于审计) CREATE TABLE `transactions` ( `id` BIGINT AUTO_INCREMENT PRIMARY KEY, - `tx_hash` VARCHAR(66) NOT NULL UNIQUE, - `from_address` VARCHAR(42) NOT NULL, - `to_address` VARCHAR(42) NOT NULL, - `amount` DECIMAL(20, 8) NOT NULL, - `symbol` VARCHAR(10) NOT NULL, - `chain` VARCHAR(10) NOT NULL, + `tx_hash` VARCHAR(66) NOT NULL UNIQUE COMMENT '交易哈希', + `from_address` VARCHAR(42) NOT NULL COMMENT '发送地址', + `to_address` VARCHAR(42) NOT NULL COMMENT '接收地址', + `amount` DECIMAL(20, 8) NOT NULL COMMENT '金额', + `symbol` VARCHAR(10) NOT NULL COMMENT '币种', + `chain` VARCHAR(10) NOT NULL COMMENT '链名称', `tx_type` TINYINT NOT NULL COMMENT '0=充值,1=提现,2=支付', `status` TINYINT NOT NULL COMMENT '0=失败,1=成功,2=待确认', - `block_height` BIGINT, + `block_height` BIGINT COMMENT '区块高度', + `queue_id` VARCHAR(50) COMMENT '队列ID', + `order_id` VARCHAR(50) COMMENT '订单ID', `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `confirmed_at` TIMESTAMP NULL COMMENT '确认时间', INDEX `idx_tx_hash` (`tx_hash`), INDEX `idx_from` (`from_address`), - INDEX `idx_to` (`to_address`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + INDEX `idx_to` (`to_address`), + INDEX `idx_status` (`status`), + INDEX `idx_order_id` (`order_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='交易记录表'; ``` +### 重要说明 + +⚠️ **私钥安全**: +- 数据库中存储的是**加密后的私钥** +- 需要实现真实的加密/解密逻辑 +- 当前代码中的解密逻辑是占位代码,需要替换为实际的加密算法(如 AES-256) + +⚠️ **归集钱包**: +- 必须在数据库中配置归集钱包 +- 地址字段填写 `"归集钱包"` 字符串 +- 保持足够的 USDT 和 ETH 余额 + --- ## 部署指南 @@ -818,10 +967,24 @@ sudo journalctl -u m2pool-payment -f ### Q5: 如何保证私钥安全? **A:** -1. 私钥在数据库中**加密存储** -2. 仅在转账时临时解密 -3. 建议使用 HSM(硬件安全模块) -4. 使用 KMS(密钥管理服务) +1. **私钥加密存储**:数据库中存储加密后的私钥 +2. **临时解密**:仅在转账时临时解密,用完立即释放 +3. **访问控制**:数据库限制访问权限 +4. **建议方案**: + - 使用 AES-256 加密私钥 + - 使用 HSM(硬件安全模块) + - 使用云服务商的 KMS(密钥管理服务) + - 使用环境变量传递解密密钥 + +⚠️ **重要**:当前代码中的解密逻辑(`eth.go` 第 467 行)是**占位代码**,生产环境必须替换为真实的加密算法! + +```go +// ❌ 当前代码(占位) +privateKey := encryptedKey + address + "" + +// ✅ 应该改为(示例) +privateKey := AES256Decrypt(encryptedKey, decodeKey) +``` ### Q6: 余额不足时如何处理? @@ -863,6 +1026,35 @@ sudo journalctl -u m2pool-payment -f 2. 实现幂等性处理 3. 数据库添加唯一索引 +### Q11: 支持的 USDT 合约地址是什么? + +**A:** 当前配置的合约地址: +- **以太坊主网**:`0xdAC17F958D2ee523a2206206994597C13D831ec7` +- **测试网**:需要部署测试 ERC20 合约 +- **私有链**:需要部署自己的 ERC20 合约 + +修改合约地址位置:`internal/blockchain/eth/eth.go` 第 99 行 + +### Q12: 新区块监听的作用是什么? + +**A:** 新区块监听确保交易及时确认: +- **问题**:如果只依赖 USDT Transfer 事件,在长时间无 USDT 转账时,待确认交易无法被确认 +- **解决**:监听新区块产生,每个新区块都检查待确认交易 +- **效果**:交易在达到第 20 个区块后,下一个区块就会被确认 + +### Q13: Channel 缓冲区设置多大? + +**A:** 当前设置为 1000: +```go +chainEventCh := make(chan any, 1000) +``` + +- **轻量负载**(<50 TPS):100 足够 +- **中等负载**(50-200 TPS):500-1000 推荐 +- **高负载**(>200 TPS):2000+ 或优化架构 + +监控 Channel 使用率,避免满载丢消息。 + --- ## 安全建议 @@ -1003,6 +1195,70 @@ rabbitmqctl list_queues --- +## 注意事项 + +### ⚠️ 生产环境部署前必须修改 + +#### 1. 私钥加密实现(重要!) + +**位置**:`internal/blockchain/eth/eth.go` 第 467 行 + +**当前代码**(占位): +```go +privateKey := encryptedKey + address + "" +``` + +**必须改为**(示例): +```go +// 使用 AES-256-GCM 加密 +import "crypto/aes" +import "crypto/cipher" + +func (e *ETHNode) decodePrivatekey(address string) string { + // 从数据库查询加密密钥 + encryptedKey := queryFromDB(address) + + // 使用 AES 解密 + privateKey := AESDecrypt(encryptedKey, e.decodeKey) + + return privateKey +} +``` + +#### 2. USDT 合约地址验证 + +确认你的以太坊网络与合约地址匹配: + +| 网络 | Chain ID | USDT 合约地址 | +|------|----------|--------------| +| 主网 | 1 | `0xdAC17F958D2ee523a2206206994597C13D831ec7` ✅ | +| Goerli | 5 | 需要部署测试合约 | +| Sepolia | 11155111 | 需要部署测试合约 | +| 私有链 | 自定义 | 需要部署自己的合约 | + +**修改位置**:`internal/blockchain/eth/eth.go` 第 99 行 + +#### 3. 消息签名密钥 + +**当前密钥**:`9f3c7a12`(测试用) + +**生产环境**: +```bash +# 生成强密钥 +openssl rand -hex 32 + +# 启动时传入 +./m2pool-payment -key=your_production_secret_key +``` + +#### 4. 数据库密码安全 + +- ❌ 不要在代码中硬编码密码 +- ✅ 使用环境变量 +- ✅ 使用配置管理工具(如 Vault) + +--- + ## 贡献指南 欢迎贡献代码!请遵循以下步骤: @@ -1015,51 +1271,256 @@ rabbitmqctl list_queues ### 代码规范 -- 使用 `gofmt` 格式化代码 -- 遵循 Go 命名规范 -- 添加必要的注释 -- 编写单元测试 -- 更新文档 +- ✅ 使用 `gofmt` 格式化代码 +- ✅ 遵循 Go 命名规范 +- ✅ 添加必要的注释 +- ✅ 所有地址统一小写处理 +- ✅ 使用状态码常量(不要硬编码数字) +- ✅ 添加错误处理和日志 +- ✅ 更新相关文档 + +### 提交规范 + +```bash +# 功能 +feat: 添加 BTC 网络支持 + +# 修复 +fix: 修复充值消息重复发送问题 + +# 文档 +docs: 更新 API 文档 + +# 性能 +perf: 优化交易确认性能 + +# 重构 +refactor: 重构数据库连接池 +``` + +--- + +## 命令行参数 + +```bash +# 启动程序 +./m2pool-payment -key=your_secret_key + +# 参数说明 +-key string + 通信密钥,用于消息签名验证 (默认: "m2pool") +``` + +### 签名验证算法 + +```go +// 生成签名 +hash := SHA256(hex(timestamp) + secret_key) +sign := hex.EncodeToString(hash) + +// 示例 +timestamp = 1758610297 +secret_key = "9f3c7a12" +hash = SHA256("696a2d6929" + "9f3c7a12") +sign = "219b3b3935f3d56db7eacd32aae84fa06df95806373d6fc4ed6e9b35ffb17f2d" +``` + +--- + +## 运行日志示例 + +### 启动日志 +``` +======================================== +🚀 M2Pool Payment System Starting... +======================================== +✅ 配置加载成功: RPC=http://10.168.3.236:18545, WS=ws://10.168.3.236:18546 +✅ 区块链服务初始化完成 +✅ RabbitMQ服务初始化完成: amqp://m2pool:m2pool@localhost:5672 +✅ RabbitMQ 监听启动完成 +======================================== +🎉 所有服务启动完成! +======================================== +🔍 ETH 开始监听 USDT Transfer 事件... +🔍 开始监听新区块... +✅ 订阅成功 +✅ 新区块订阅成功 +``` + +### 充值日志 +``` +📥 [RMQ] 收到充值请求: Chain=ETH, Symbol=USDT, Address=0x123... +📨 [链上] 充值待确认: Address=0x123..., Amount=100.50, TxHash=0xabc... +📤 [RMQ] 发送充值响应: Address=0x123..., Status=2, TxHash=0xabc... +✅ [链上] 充值确认: Address=0x123..., Amount=100.50, TxHash=0xabc..., Status=1 +📤 [RMQ] 发送充值响应: Address=0x123..., Status=1, TxHash=0xabc... +``` + +### 提现日志 +``` +📥 [RMQ] 收到提现请求: QueueId=w123, From=0x111..., To=0x222..., Amount=50.00 USDT +✅ [链上] 提现确认: QueueId=w123, Amount=50.00, TxHash=0xdef..., Status=1 +📤 [RMQ] 发送提现响应: QueueId=w123, Status=1, TxHash=0xdef... +``` --- ## 路线图 -### v2.1 (计划中) -- [ ] 支持 BTC 网络 -- [ ] 支持 TRON 网络 -- [ ] 添加 HTTP API -- [ ] 添加 WebSocket 推送 +### ✅ v2.0 (已完成) +- [x] 以太坊 USDT 支持 +- [x] 充值/提现/支付功能 +- [x] RabbitMQ 集成 +- [x] 双重监听机制 +- [x] 自动交易确认 +- [x] 地址统一规范 +- [x] Gas 费用检查 +- [x] Panic 恢复机制 + +### 🚧 v2.1 (开发中) +- [ ] 支持更多 ERC20 代币(USDC, DAI) +- [ ] 交易记录持久化 +- [ ] HTTP API 接口 +- [ ] 私钥真实加密实现 - [ ] 性能优化(读写锁、缓存) -### v2.2 (计划中) -- [ ] 支持更多 ERC20 代币 +### 📋 v2.2 (计划中) +- [ ] 支持 TRON 网络(TRC20-USDT) +- [ ] 支持 BTC 网络 - [ ] 多签钱包支持 -- [ ] 交易记录持久化 -- [ ] 管理后台 -- [ ] 监控面板 +- [ ] 管理后台界面 +- [ ] 实时监控面板 +- [ ] 告警系统 -### v3.0 (规划中) -- [ ] 微服务架构 +### 🔮 v3.0 (规划中) +- [ ] 微服务架构拆分 - [ ] 水平扩展支持 - [ ] 分布式事务 -- [ ] 高可用部署 +- [ ] 高可用集群部署 +- [ ] Kubernetes 支持 + +--- + +## 核心特性详解 + +### 1. 双重监听机制 🎯 + +系统同时监听两种链上事件: + +**① USDT Transfer 事件监听** +```go +// 检测 USDT 转账,用于充值检测和交易确认触发 +e.WsClient.SubscribeFilterLogs(query, e.USDT.LogsChan) +``` + +**② 新区块头监听** +```go +// 每个新区块触发交易确认检查,确保及时确认 +e.WsClient.SubscribeNewHead(e.Ctx, headers) +``` + +### 2. 智能交易确认 ⚡ + +**事件驱动 + 区块驱动**: +- Transfer 事件到达时立即检查 +- 每个新区块产生时也检查 +- **确保交易在第 20 个区块后立即确认** + +### 3. 地址统一规范 🔡 + +所有以太坊地址**统一转换为小写**: +- 存储时转换 +- 比较时转换 +- 查询时转换 + +避免大小写不一致导致的匹配失败。 + +### 4. 并发安全设计 🔒 + +- `sync.Map` 用于高并发地址监听 +- `sync.Mutex` 保护共享数据结构 +- Channel 缓冲区防止阻塞 +- Goroutine panic 恢复机制 + +### 5. 余额智能管理 💰 + +**自动归集钱包切换:** +``` +用户钱包余额 < 转账金额 + ↓ +自动使用归集钱包 + ↓ +确保交易成功 +``` + +### 6. Gas 费用检查 ⛽ + +转账前自动检查: +- USDT 余额是否足够 +- ETH 余额是否足够支付 Gas +- 预估 Gas 价格 --- ## 相关文档 - [RabbitMQ 使用说明](internal/queue/README.md) -- [并发能力分析](docs/CONCURRENCY_ANALYSIS.md)(如果有) -- [交易类型说明](docs/TRANSACTION_TYPES.md)(如果有) + +--- + +## 贡献指南 + +欢迎贡献代码!请遵循以下步骤: + +### 提交流程 + +1. Fork 本项目 +2. 创建功能分支 (`git checkout -b feature/AmazingFeature`) +3. 提交更改 (`git commit -m 'feat: Add some AmazingFeature'`) +4. 推送到分支 (`git push origin feature/AmazingFeature`) +5. 开启 Pull Request + +### 代码规范 + +- ✅ 使用 `gofmt` 格式化代码 +- ✅ 遵循 Go 命名规范 +- ✅ 添加必要的注释 +- ✅ 所有地址统一小写处理 +- ✅ 使用状态码常量(不要硬编码数字) +- ✅ 添加错误处理和日志 +- ✅ 更新相关文档 + +### Commit 规范 + +```bash +# 新功能 +feat: 添加 BTC 网络支持 + +# Bug 修复 +fix: 修复充值消息重复发送问题 + +# 文档更新 +docs: 更新 API 文档 + +# 性能优化 +perf: 优化交易确认性能 + +# 代码重构 +refactor: 重构数据库连接池 + +# 测试 +test: 添加单元测试 +``` --- ## 技术支持 -- 📧 Email: support@m2pool.com -- 💬 Telegram: @m2pool_support -- 📱 WeChat: m2pool_tech +如有问题,欢迎通过以下方式联系: + +- 📧 Email: support@example.com +- 💬 Issues: [GitHub Issues](https://github.com/your-repo/issues) +- 📖 文档: [项目 Wiki](https://github.com/your-repo/wiki) --- @@ -1097,3 +1558,4 @@ Made with ❤️ by M2Pool Team + diff --git a/bin/start.sh b/bin/start.sh index 3500197..5c0ad7e 100644 --- a/bin/start.sh +++ b/bin/start.sh @@ -1,2 +1,2 @@ #/bin/bash -./payment \ No newline at end of file +./payment -key=你的密钥 \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 74bcb12..d1e39d1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,7 +1,12 @@ package main -import server "m2pool-payment/internal" +import ( + "flag" + server "m2pool-payment/internal" +) func main() { - server.Start() + msgKey := flag.String("key", "m2pool", "通信密钥") + flag.Parse() + server.Start(*msgKey) } diff --git a/go.mod b/go.mod index c45b057..95ef2b6 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,12 @@ require ( github.com/go-sql-driver/mysql v1.9.3 ) +require ( + github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect + google.golang.org/protobuf v1.36.6 // indirect +) + require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect @@ -18,13 +24,13 @@ require ( github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect github.com/deckarep/golang-set/v2 v2.6.0 // indirect - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/ethereum/c-kzg-4844/v2 v2.1.3 // indirect github.com/ethereum/go-verkle v0.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/holiman/uint256 v1.3.2 // indirect - github.com/rabbitmq/amqp091-go v1.10.0 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect diff --git a/go.sum b/go.sum index a5857cb..ceafd51 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAK github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/consensys/gnark-crypto v0.18.0 h1:vIye/FqI50VeAr0B3dx+YjeIvmc3LWz4yEfbWBpTUf0= github.com/consensys/gnark-crypto v0.18.0/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c= -github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= -github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/crate-crypto/go-eth-kzg v1.4.0 h1:WzDGjHk4gFg6YzV0rJOAsTK4z3Qkz5jd4RE3DAvPFkg= github.com/crate-crypto/go-eth-kzg v1.4.0/go.mod h1:J9/u5sWfznSObptgfa92Jq8rTswn6ahQWEuiLHOjCUI= github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOVl3J+MYp5kPMoUZPp7aOYHtaua31lwRHg= @@ -40,10 +40,10 @@ github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= -github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= -github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= +github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8= +github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= github.com/emicklei/dot v1.6.2 h1:08GN+DD79cy/tzN6uLCT84+2Wk9u+wvqP+Hkx/dIR8A= github.com/emicklei/dot v1.6.2/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= github.com/ethereum/c-kzg-4844/v2 v2.1.3 h1:DQ21UU0VSsuGy8+pcMJHDS0CV1bKmJmxsJYK8l3MiLU= @@ -101,8 +101,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= @@ -165,6 +165,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= @@ -181,8 +183,8 @@ golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/internal/blockchain/eth/eth.go b/internal/blockchain/eth/eth.go index 349cc41..0271a8b 100644 --- a/internal/blockchain/eth/eth.go +++ b/internal/blockchain/eth/eth.go @@ -293,77 +293,77 @@ func (e *ETHNode) handleUSDTEvent(vLog types.Log, ch chan any) { return } tx_hash := vLog.TxHash.Hex() - tx, tx_ok := e.UnConfirmTxs[tx_hash] - if tx_ok { - // 【支付/提现】待确认交易中存在该交易Hash(说明是我们主动发起的交易) - // 直接走确认流程,不发送待确认消息 - // log.Printf("🔍 检测到已发起的交易: TxHash=%s, Type=%d", tx_hash, tx.TxType) - e.confirm("USDT", height, tx, ch) - } else { - // 【充值】待确认交易中不存在该交易hash(说明是外部转账) - // 添加至待确认交易中,并立即发送待确认消息 - // 1,先根据to查询RmqMsgs,再根据存在的rmq_msg中的相关数据,存入待确认交易 - value, rmq_msg_ok := e.RmqMsgs[toAddr] - var tx_type int - if rmq_msg_ok { - for _, v := range value { - _, ok := v.(message.TopupMsg_req) - if ok { - tx_type = 0 - } - _, ok1 := v.(message.WithdrawMsg_req) - if ok1 { - tx_type = 1 - } - _, ok2 := v.(message.PayMsg_req) - if ok2 { - tx_type = 2 - } + // tx, tx_ok := e.UnConfirmTxs[tx_hash] + // if tx_ok { + // // 【支付/提现】待确认交易中存在该交易Hash(说明是我们主动发起的交易) + // // 直接走确认流程,不发送待确认消息 + // // log.Printf("🔍 检测到已发起的交易: TxHash=%s, Type=%d", tx_hash, tx.TxType) + // e.confirm("USDT", height, tx, ch) + // } else { + // 【充值】待确认交易中不存在该交易hash(说明是外部转账) + // 添加至待确认交易中,并立即发送待确认消息 + // 1,先根据to查询RmqMsgs,再根据存在的rmq_msg中的相关数据,存入待确认交易 + value, rmq_msg_ok := e.RmqMsgs[toAddr] + var tx_type int + if rmq_msg_ok { + for _, v := range value { + _, ok := v.(message.TopupMsg_req) + if ok { + tx_type = 0 + } + _, ok1 := v.(message.WithdrawMsg_req) + if ok1 { + tx_type = 1 + } + _, ok2 := v.(message.PayMsg_req) + if ok2 { + tx_type = 2 } } - e.UnConfirmTxs[tx_hash] = message.Tx_msg{ - TxType: tx_type, - Tx: message.Tx{ - From: fromAddr, - To: toAddr, - Height: height, - TxHash: tx_hash, - Symbol: "USDT", - Value: utils.BigIntUSDTToFloat64(transferEvent.Value), - Status: 2, // 待确认状态 - }, - } - // log.Printf("📝 待确认交易新增: TxHash=%s, Height=%d, To=%s, Type=%d", tx_hash, height, toAddr, tx_type) + } + e.UnConfirmTxs[tx_hash] = message.Tx_msg{ + TxType: tx_type, + Tx: message.Tx{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: tx_hash, + Symbol: "USDT", + Value: utils.BigIntUSDTToFloat64(transferEvent.Value), + Status: 2, // 待确认状态 + }, + } + // log.Printf("📝 待确认交易新增: TxHash=%s, Height=%d, To=%s, Type=%d", tx_hash, height, toAddr, tx_type) - // 🔔 【仅充值】立即发送待确认状态的消息(支付/提现不发送待确认消息) - if tx_type == 0 && rmq_msg_ok { - for _, v := range value { - d1, ok := v.(message.TopupMsg_req) - if ok && strings.ToLower(d1.Address) == toAddr { - pendingMsg := message.TopupMsg_resp{ - Address: toAddr, - Status: 2, // 待确认状态 - Chain: d1.Chain, - Symbol: d1.Symbol, - Amount: utils.BigIntUSDTToFloat64(transferEvent.Value), - TxHash: tx_hash, + // 🔔 【仅充值】立即发送待确认状态的消息(支付/提现不发送待确认消息) + if tx_type == 0 && rmq_msg_ok { + for _, v := range value { + d1, ok := v.(message.TopupMsg_req) + if ok && strings.ToLower(d1.Address) == toAddr { + pendingMsg := message.TopupMsg_resp{ + Address: toAddr, + Status: 2, // 待确认状态 + Chain: d1.Chain, + Symbol: d1.Symbol, + Amount: utils.BigIntUSDTToFloat64(transferEvent.Value), + TxHash: tx_hash, + } + // log.Printf("📤 发送待确认充值消息: TxHash=%s, Address=%s, Amount=%.2f", + // tx_hash, toAddr, pendingMsg.Amount) + + // 异步发送,避免阻塞事件处理 + go func(msg message.TopupMsg_resp) { + select { + case ch <- msg: + log.Printf("✅ 待确认充值消息已发送") + default: + log.Printf("⚠️ 通道阻塞,待确认消息发送失败") } - // log.Printf("📤 发送待确认充值消息: TxHash=%s, Address=%s, Amount=%.2f", - // tx_hash, toAddr, pendingMsg.Amount) - - // 异步发送,避免阻塞事件处理 - go func(msg message.TopupMsg_resp) { - select { - case ch <- msg: - log.Printf("✅ 待确认充值消息已发送") - default: - log.Printf("⚠️ 通道阻塞,待确认消息发送失败") - } - }(pendingMsg) - break - } + }(pendingMsg) + break } } + // } } } @@ -577,11 +577,11 @@ func (e *ETHNode) decodePrivatekey(address string) string { func (e *ETHNode) usdt_transfer(msg any) error { var user_from, final_from, to string var amount float64 - var tx_type int - now_height, err := e.getBlockHeight() - if err != nil { - return fmt.Errorf("get lastest height error: %v", err) - } + // var tx_type int + // now_height, err := e.getBlockHeight() + // if err != nil { + // return fmt.Errorf("get lastest height error: %v", err) + // } // --------------------------------------------------------------------------------------------- // 断言,确定本次转账是哪个类型 // 支付操作 @@ -590,7 +590,7 @@ func (e *ETHNode) usdt_transfer(msg any) error { e.AddAddress(v.ToAddress, v) // 存入该笔msg(AddAddress内部会转小写) // 统一转换为小写 user_from, final_from, to, amount = strings.ToLower(v.FromAddress), strings.ToLower(v.FromAddress), strings.ToLower(v.ToAddress), v.Amount - tx_type = 2 + // tx_type = 2 } // 提现操作 k, ok1 := msg.(message.WithdrawMsg_req) @@ -598,7 +598,7 @@ func (e *ETHNode) usdt_transfer(msg any) error { e.AddAddress(k.ToAddress, k) // 存入该笔msg(AddAddress内部会转小写) // 统一转换为小写 user_from, final_from, to, amount = strings.ToLower(k.FromAddress), strings.ToLower(k.FromAddress), strings.ToLower(k.ToAddress), k.Amount - tx_type = 1 + // tx_type = 1 } // --------------------------------------------------------------------------------------------- // 1,校验钱包余额 @@ -658,7 +658,7 @@ func (e *ETHNode) usdt_transfer(msg any) error { ) // 6, 签名交易并获得txHash signedTx, err := types.SignTx(tx, types.NewEIP155Signer(e.NetId), privateKey) - txHash := signedTx.Hash().Hex() // 通过签名信息解析出交易hash + // txHash := signedTx.Hash().Hex() // 通过签名信息解析出交易hash if err != nil { return fmt.Errorf("failed to sign transaction: %w", err) } @@ -667,21 +667,21 @@ func (e *ETHNode) usdt_transfer(msg any) error { if err != nil { return fmt.Errorf("failed to send transaction: %w", err) } - // 8, 构造交易消息 - tx_msg := message.Tx_msg{ - TxType: tx_type, - Tx: message.Tx{ - From: final_from, - To: to, - Height: now_height, - TxHash: txHash, - Symbol: "USDT", - Value: amount, - Status: 2, - }, - } - // 9, 将构造的交易消息存入待确认交易中 - e.UnConfirmTxs[txHash] = tx_msg + // // 8, 构造交易消息 + // tx_msg := message.Tx_msg{ + // TxType: tx_type, + // Tx: message.Tx{ + // From: final_from, + // To: to, + // Height: now_height, + // TxHash: txHash, + // Symbol: "USDT", + // Value: amount, + // Status: 2, + // }, + // } + // // 9, 将构造的交易消息存入待确认交易中 + // e.UnConfirmTxs[txHash] = tx_msg return nil } diff --git a/internal/msg/msg.go b/internal/msg/msg.go index e20f180..5245541 100644 --- a/internal/msg/msg.go +++ b/internal/msg/msg.go @@ -4,8 +4,9 @@ import "time" // 配置文件结构 type Config struct { - RMQConfig RMQConfig `json:"rmq_config"` - ETHConfig ETHConfig `json:"eth_config"` + RMQConfig RMQConfig `json:"rmq_config"` + ETHConfig ETHConfig `json:"eth_config"` + TRONConfig TRONConfig `json:"tron_config"` } type RMQConfig struct { @@ -31,6 +32,12 @@ type ETHConfig struct { DbConfig DbConfig `json:"dbConfig"` } +type TRONConfig struct { + RpcUrl string `json:"rpcUrl"` + ConfirmHeight uint64 `json:"confirmHeight"` + DbConfig DbConfig `json:"dbConfig"` +} + // Config 数据库配置 type DbConfig struct { User string `json:"user"` diff --git a/internal/queue/README.md b/internal/queue/README.md index 80c6a81..5243a6c 100644 --- a/internal/queue/README.md +++ b/internal/queue/README.md @@ -1,295 +1,40 @@ -# RabbitMQ 服务使用说明 +# RabbitMQ 服务模块 -## 功能概述 +本模块提供 RabbitMQ 消息队列的封装,用于接收业务系统的请求和发送交易确认响应。 -这个 RabbitMQ 服务实现了区块链支付系统的消息队列功能,包括: +## 快速使用 -### 消费队列(监听) -1. **充值队列 (topup)** - 监听用户充值请求 -2. **提现队列 (withdraw)** - 监听用户提现请求 -3. **支付队列 (pay)** - 监听用户支付请求 - -### 发布队列(响应) -1. **充值响应队列 (topup_resp)** - 发送充值确认结果 -2. **提现响应队列 (withdraw_resp)** - 发送提现确认结果 -3. **支付响应队列 (pay_resp)** - 发送支付确认结果 - ---- - -## 快速开始 - -### 1. 安装依赖 - -```bash -go get github.com/rabbitmq/amqp091-go -``` - -### 2. 配置文件示例 (config.json) - -```json -{ - "rmq_config": { - "sub_addr": "amqp://username:password@localhost:5672", - "pay": { - "queue": "pay.auto.queue", - "exchange": "pay.exchange", - "routing": ["pay.auto.routing.key"] - }, - "topup": { - "queue": "pay.recharge.queue", - "exchange": "pay.exchange", - "routing": ["pay.recharge.routing.key"] - }, - "withdraw": { - "queue": "pay.withdraw.queue", - "exchange": "pay.exchange", - "routing": ["pay.withdraw.routing.key"] - }, - "pay_resp": { - "queue": "pay.auto.return.queue", - "exchange": "pay.exchange", - "routing": ["pay.auto.return.routing.key"] - }, - "topup_resp": { - "queue": "pay.recharge.return.queue", - "exchange": "pay.exchange", - "routing": ["pay.recharge.return.routing.key"] - }, - "withdraw_resp": { - "queue": "pay.withdraw.return.queue", - "exchange": "pay.exchange", - "routing": ["pay.withdraw.return.routing.key"] - } - } +```go +// 创建 RabbitMQ 服务 +rmqServer, err := rmq.NewRabbitMQServer(config.RMQConfig) +if err != nil { + log.Fatal(err) } -``` +defer rmqServer.Close() -### 3. 使用示例 - -```go -package main - -import ( - "log" - message "m2pool-payment/internal/msg" - rmq "m2pool-payment/internal/queue" -) - -func main() { - // 加载配置 - config := loadConfig() // 你的配置加载逻辑 - - // 创建 RabbitMQ 服务 - rmqServer, err := rmq.NewRabbitMQServer(config.RMQConfig) - if err != nil { - log.Fatalf("创建 RabbitMQ 服务失败: %v", err) - } - defer rmqServer.Close() - - // 设置消息处理回调 - rmqServer.OnTopupMsg = func(msg message.TopupMsg_req) { - log.Printf("收到充值请求: %+v", msg) - // 处理充值逻辑... - // 添加地址监听等 - } - - rmqServer.OnWithdrawMsg = func(msg message.WithdrawMsg_req) { - log.Printf("收到提现请求: %+v", msg) - // 处理提现逻辑... - // 调用区块链转账 - } - - rmqServer.OnPayMsg = func(msg message.PayMsg_req) { - log.Printf("收到支付请求: %+v", msg) - // 处理支付逻辑... - // 调用区块链转账 - } - - // 启动监听 - if err := rmqServer.Start(); err != nil { - log.Fatalf("启动监听失败: %v", err) - } - - // 发送响应消息示例 - rmqServer.PublishTopupResp(message.TopupMsg_resp{ - Address: "0x123...", - Status: 1, - Chain: "ETH", - Symbol: "USDT", - Amount: 100.0, - TxHash: "0xabc...", - }) - - // 保持运行 - select {} -} -``` - ---- - -## API 说明 - -### 创建服务 - -```go -func NewRabbitMQServer(config message.RMQConfig) (*RabbitMQServer, error) -``` - -创建一个新的 RabbitMQ 服务实例。 - -**参数:** -- `config`: RabbitMQ 配置 - -**返回:** -- `*RabbitMQServer`: 服务实例 -- `error`: 错误信息 - ---- - -### 启动监听 - -```go -func (r *RabbitMQServer) Start() error -``` - -启动所有队列的监听(topup, withdraw, pay)。 - ---- - -### 消息回调设置 - -```go -// 充值请求回调 +// 设置消息回调 rmqServer.OnTopupMsg = func(msg message.TopupMsg_req) { - // 处理逻辑 + // 处理充值请求 } -// 提现请求回调 -rmqServer.OnWithdrawMsg = func(msg message.WithdrawMsg_req) { - // 处理逻辑 -} +// 启动监听 +rmqServer.Start() -// 支付请求回调 -rmqServer.OnPayMsg = func(msg message.PayMsg_req) { - // 处理逻辑 -} +// 发送响应 +rmqServer.PublishTopupResp(response) ``` ---- +## 消息队列结构 -### 发布响应消息 +### 请求队列(消费) +- `topup` - 充值请求 +- `withdraw` - 提现请求 +- `pay` - 支付请求 -```go -// 发布充值响应 -func (r *RabbitMQServer) PublishTopupResp(resp message.TopupMsg_resp) error - -// 发布提现响应 -func (r *RabbitMQServer) PublishWithdrawResp(resp message.WithdrawMsg_resp) error - -// 发布支付响应 -func (r *RabbitMQServer) PublishPayResp(resp message.PayMsg_resp) error -``` - ---- - -### 关闭服务 - -```go -func (r *RabbitMQServer) Close() error -``` - -优雅关闭 RabbitMQ 连接和所有监听。 - ---- - -## 消息格式 - -### 充值请求 (TopupMsg_req) -```json -{ - "chain": "ETH", - "symbol": "USDT", - "address": "0x123...", - "timestamp": 1234567890, - "sign": "signature_string" -} -``` - -### 充值响应 (TopupMsg_resp) -```json -{ - "address": "0x123...", - "status": 1, - "chain": "ETH", - "symbol": "USDT", - "amount": 100.5, - "tx_hash": "0xabc..." -} -``` - -### 提现请求 (WithdrawMsg_req) -```json -{ - "queue_id": "queue_123", - "from_address": "0x123...", - "to_address": "0x456...", - "amount": 50.0, - "chain": "ETH", - "symbol": "USDT", - "timestamp": 1234567890, - "sign": "signature_string" -} -``` - -### 提现响应 (WithdrawMsg_resp) -```json -{ - "queue_id": "queue_123", - "status": 1, - "amount": 50.0, - "chain": "ETH", - "symbol": "USDT", - "tx_hash": "0xdef..." -} -``` - -### 支付请求 (PayMsg_req) -```json -{ - "queue_id": "queue_456", - "from_address": "0x123...", - "to_address": "0x789...", - "amount": 200.0, - "chain": "ETH", - "symbol": "USDT", - "order_id": "order_789", - "timestamp": 1234567890, - "sign": "signature_string" -} -``` - -### 支付响应 (PayMsg_resp) -```json -{ - "queue_id": "queue_456", - "status": 1, - "amount": 200.0, - "chain": "ETH", - "symbol": "USDT", - "order_id": "order_789", - "tx_hash": "0xghi..." -} -``` - ---- - -## 状态码说明 - -- `status = 0` - 失败 -- `status = 1` - 成功 -- `status = 2` - 待确认(仅用于链上交易) - ---- +### 响应队列(发布) +- `topup_resp` - 充值响应 +- `withdraw_resp` - 提现响应 +- `pay_resp` - 支付响应 ## 特性 @@ -297,67 +42,5 @@ func (r *RabbitMQServer) Close() error ✅ **消息持久化** - 消息不会丢失 ✅ **手动确认** - 处理成功后才确认消息 ✅ **并发安全** - 支持多 goroutine 并发发布 -✅ **优雅关闭** - 支持优雅关闭所有连接 -✅ **错误处理** - 完善的错误日志和重试机制 - ---- - -## 注意事项 - -1. **连接地址格式**: `amqp://username:password@host:port/` -2. **消息确认**: 只有处理成功的消息才会被确认,失败的消息会重新入队 -3. **并发安全**: 发布消息时使用了互斥锁,可以安全地从多个 goroutine 调用 -4. **重连机制**: 连接断开时会自动重连,间隔 3 秒 -5. **消息持久化**: 消息和队列都是持久化的,重启后不会丢失 - ---- - -## 工作流程 - -``` -┌─────────────┐ ┌──────────────┐ ┌─────────────┐ -│ 业务系统 │ ------> │ RabbitMQ │ ------> │ 支付系统 │ -│ │ 充值/ │ (消息队列) │ 消费 │ (本系统) │ -│ │ 提现/ │ │ │ │ -│ │ 支付 │ │ │ │ -└─────────────┘ └──────────────┘ └─────────────┘ - | - | 监听链上 - v - ┌─────────────┐ - │ 区块链节点 │ - │ (ETH) │ - └─────────────┘ - | - | 交易确认 - v -┌─────────────┐ ┌──────────────┐ ┌─────────────┐ -│ 业务系统 │ <------ │ RabbitMQ │ <------ │ 支付系统 │ -│ │ 响应 │ (响应队列) │ 发布 │ (本系统) │ -└─────────────┘ └──────────────┘ └─────────────┘ -``` - ---- - -## 故障排查 - -### 连接失败 -- 检查 RabbitMQ 服务是否运行 -- 检查用户名密码是否正确 -- 检查网络连接和端口(默认 5672) - -### 消息未被消费 -- 检查队列绑定是否正确 -- 检查 routing key 是否匹配 -- 查看 RabbitMQ 管理界面的队列状态 - -### 消息重复消费 -- 确保消息处理成功后调用了 Ack -- 检查是否有多个消费者实例 - ---- - -## 许可证 - -MIT License +更多详情请参考 [主 README](../../README.md)。 diff --git a/internal/server.go b/internal/server.go index 2cdcfcc..e534a44 100644 --- a/internal/server.go +++ b/internal/server.go @@ -27,6 +27,7 @@ const ( ) type ServerCtx struct { + msgKey string Config message.Config blockChainServer *blockchain.BlockChainServer rmqServer *rmq.RabbitMQServer @@ -41,7 +42,7 @@ func verifyMessage(timestamp uint64, sign string) bool { return hash == sign } -func loadConfig() { +func loadConfig(msgKey string) { file, err := os.ReadFile("config.json") if err != nil { panic(fmt.Sprintf("读取配置文件失败: %v", err)) @@ -54,6 +55,8 @@ func loadConfig() { log.Printf("✅ 配置加载成功: RPC=%s, WS=%s", s_ctx.Config.ETHConfig.RpcURL, s_ctx.Config.ETHConfig.WsURL) + + s_ctx.msgKey = msgKey } func initBlockChainServer() { @@ -252,13 +255,13 @@ func handleChainEvent(chainEventCh chan any) { } } -func Start() { +func Start(msgKey string) { log.Println("========================================") log.Println("🚀 M2Pool Payment System Starting...") log.Println("========================================") // 加载配置 - loadConfig() + loadConfig(msgKey) // ================== 初始化区块链节点 ================== initBlockChainServer()