commit a0689dbb097658bdc7120f91ccbd19ad264a9f97 Author: lzx <393768033@qq.com> Date: Thu Oct 16 18:54:27 2025 +0800 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..b42eafa --- /dev/null +++ b/README.md @@ -0,0 +1,1099 @@ +# M2Pool Payment System v2 + +
+ +**基于区块链的分布式支付系统** + +[![Go Version](https://img.shields.io/badge/Go-1.24+-00ADD8?style=flat&logo=go)](https://golang.org) +[![Ethereum](https://img.shields.io/badge/Ethereum-Mainnet-3C3C3D?style=flat&logo=ethereum)](https://ethereum.org) +[![RabbitMQ](https://img.shields.io/badge/RabbitMQ-3.x-FF6600?style=flat&logo=rabbitmq)](https://www.rabbitmq.com) +[![MySQL](https://img.shields.io/badge/MySQL-8.0+-4479A1?style=flat&logo=mysql)](https://www.mysql.com) + +支持 **充值**、**提现**、**支付** 三大核心功能,实时监听链上交易,自动确认到账。 + +[快速开始](#快速开始) • [功能特性](#功能特性) • [架构设计](#架构设计) • [API 文档](#api-文档) + +
+ +--- + +## 📋 目录 + +- [项目简介](#项目简介) +- [功能特性](#功能特性) +- [架构设计](#架构设计) +- [快速开始](#快速开始) +- [配置说明](#配置说明) +- [使用示例](#使用示例) +- [API 文档](#api-文档) +- [开发指南](#开发指南) +- [部署指南](#部署指南) +- [性能指标](#性能指标) +- [常见问题](#常见问题) +- [贡献指南](#贡献指南) +- [许可证](#许可证) + +--- + +## 项目简介 + +M2Pool Payment System v2 是一个基于以太坊区块链的**分布式支付解决方案**,提供完整的数字货币充值、提现、支付功能。 + +### 核心能力 + +- 🔍 **实时监听**:订阅链上事件,实时检测 USDT 转账 +- ⚡ **快速确认**:20 个区块确认,约 4-5 分钟到账 +- 🔒 **安全可靠**:私钥加密存储,签名验证机制 +- 📊 **高并发**:支持少量并发(50-200 TPS) +- 🔄 **自动重连**:WebSocket 断开自动重连 +- 📨 **消息队列**:基于 RabbitMQ 的异步通信 + +### 技术栈 + +| 组件 | 技术 | 版本 | +|------|------|------| +| **语言** | Go | 1.24+ | +| **区块链** | Ethereum | go-ethereum v1.16.4 | +| **消息队列** | RabbitMQ | amqp091-go v1.10.0 | +| **数据库** | MySQL | 8.0+ | +| **网络协议** | WebSocket + RPC | - | + +--- + +## 功能特性 + +### 1. 充值功能 💰 + +``` +用户转账 → 实时检测 → 待确认通知 → 区块确认 → 最终通知 +``` + +**特点:** +- ✅ 实时检测到账 +- ✅ 发送**两次**通知:待确认 + 最终确认 +- ✅ 支持多币种(当前支持 USDT) +- ✅ 自动地址监听管理 + +**消息流:** +1. 业务系统发送充值请求 → RabbitMQ +2. 系统添加地址监听 +3. 用户转账 → 立即通知(status=2 待确认) +4. 等待 20 个区块 → 最终通知(status=1 成功 / 0 失败) + +--- + +### 2. 提现功能 💸 + +``` +提现请求 → 验证余额 → 发送交易 → 等待确认 → 返回结果 +``` + +**特点:** +- ✅ 自动余额检查 +- ✅ 余额不足时使用归集钱包 +- ✅ 发送**一次**通知:最终确认 +- ✅ Gas 费用检查 + +**消息流:** +1. 业务系统发送提现请求 → RabbitMQ +2. 系统验证余额并发送交易 +3. 等待 20 个区块确认 +4. 返回结果(status=1 成功 / 0 失败) + +--- + +### 3. 支付功能 💳 + +``` +支付请求 → 验证余额 → 发送交易 → 等待确认 → 返回结果 +``` + +**特点:** +- ✅ 订单关联 +- ✅ 自动余额检查 +- ✅ 发送**一次**通知:最终确认 +- ✅ 支持商户收款 + +**消息流:** +1. 业务系统发送支付请求(含订单ID)→ RabbitMQ +2. 系统验证余额并发送交易 +3. 等待 20 个区块确认 +4. 返回结果(status=1 成功 / 0 失败) + +--- + +## 架构设计 + +### 系统架构图 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 业务系统 │ +│ (Web/App/API Server) │ +└────────────┬────────────────────────────┬───────────────────┘ + │ │ + │ 请求 │ 响应 + ↓ ↑ +┌─────────────────────────────────────────────────────────────┐ +│ RabbitMQ │ +│ ┌─────────┐ ┌──────────┐ ┌────────┐ │ +│ │ topup │ │ withdraw │ │ pay │ 请求队列 │ +│ └─────────┘ └──────────┘ └────────┘ │ +│ ┌─────────┐ ┌──────────┐ ┌────────┐ │ +│ │topup_ │ │withdraw_ │ │ pay_ │ 响应队列 │ +│ │ resp │ │ resp │ │ resp │ │ +│ └─────────┘ └──────────┘ └────────┘ │ +└────────────┬────────────────────────────┬───────────────────┘ + │ │ + ↓ ↑ +┌─────────────────────────────────────────────────────────────┐ +│ M2Pool Payment System v2 │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +│ │ RabbitMQ │ │ Blockchain │ │ Database │ │ +│ │ Consumer │─>│ Manager │─>│ (MySQL) │ │ +│ └──────────────┘ └──────┬───────┘ └──────────────┘ │ +│ │ │ +│ │ WebSocket + RPC │ +└────────────────────────────┼─────────────────────────────────┘ + │ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ 以太坊区块链网络 │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ 新区块 │ │ USDT Transfer │ │ +│ │ (NewHead) │ │ 事件 │ │ +│ └──────────────┘ └──────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 核心模块 + +#### 1. Blockchain Manager (blockchain/) +- **eth/eth.go**:以太坊节点交互 + - 监听 USDT Transfer 事件 + - 监听新区块产生 + - 管理待确认交易池 + - 执行 ERC20 转账 + +#### 2. Message Queue (queue/) +- **rabbitmq.go**:消息队列服务 + - 消费充值/提现/支付请求 + - 发布交易确认响应 + - 自动重连机制 + +#### 3. Database (db/) +- **db.go**:数据库连接池 + - 存储钱包私钥(加密) + - 存储交易记录 + +#### 4. Message (msg/) +- **msg.go**:消息结构定义 + - 请求消息结构 + - 响应消息结构 + - 配置结构 + +#### 5. Utils (utils/) +- **utils.go**:工具函数 + - 数值转换 + - 加密解密 + +--- + +## 快速开始 + +### 前置条件 + +- ✅ Go 1.24 或更高版本 +- ✅ MySQL 8.0+ +- ✅ RabbitMQ 3.x+ +- ✅ 以太坊节点(支持 WebSocket 和 RPC) + +### 安装 + +```bash +# 1. 克隆项目 +git clone +cd m2pool-payment-v2 + +# 2. 安装依赖 +go mod download + +# 3. 创建数据库 +mysql -u root -p < schema.sql + +# 4. 配置文件 +cp test/config.example.json test/config.json +# 编辑 config.json,填入实际配置 + +# 5. 编译 +go build -o m2pool-payment cmd/main.go + +# 6. 运行 +./m2pool-payment + +# 或者直接运行测试 +cd test +go run test.go +``` + +--- + +## 配置说明 + +### 配置文件结构 (config.json) + +```json +{ + "rmq_config": { + "sub_addr": "amqp://username:password@localhost:5672", + "pay": { + "queue": "pay.auto.queue", + "exchange": "pay.exchange", + "routing": ["pay.auto.routing.key"] + }, + "topup": { + "queue": "pay.recharge.queue", + "exchange": "pay.exchange", + "routing": ["pay.recharge.routing.key"] + }, + "withdraw": { + "queue": "pay.withdraw.queue", + "exchange": "pay.exchange", + "routing": ["pay.withdraw.routing.key"] + }, + "pay_resp": { + "queue": "pay.auto.return.queue", + "exchange": "pay.exchange", + "routing": ["pay.auto.return.routing.key"] + }, + "topup_resp": { + "queue": "pay.recharge.return.queue", + "exchange": "pay.exchange", + "routing": ["pay.recharge.return.routing.key"] + }, + "withdraw_resp": { + "queue": "pay.withdraw.return.queue", + "exchange": "pay.exchange", + "routing": ["pay.withdraw.return.routing.key"] + } + }, + "eth_config": { + "rpcUrl": "http://localhost:8545", + "wsUrl": "ws://localhost:8546", + "confirmHeight": 20, + "dbConfig": { + "user": "root", + "password": "your_password", + "host": "127.0.0.1", + "port": 3306, + "database": "payment", + "maxOpenConns": 20, + "maxIdleCoons": 20, + "connMaxLife": 120 + } + } +} +``` + +### 配置项说明 + +| 配置项 | 说明 | 默认值 | 必填 | +|--------|------|--------|------| +| `rmq_config.sub_addr` | RabbitMQ 连接地址 | - | ✅ | +| `eth_config.rpcUrl` | 以太坊 RPC 地址 | - | ✅ | +| `eth_config.wsUrl` | 以太坊 WebSocket 地址 | - | ✅ | +| `eth_config.confirmHeight` | 确认区块数 | 20 | ✅ | +| `dbConfig.user` | 数据库用户名 | root | ✅ | +| `dbConfig.password` | 数据库密码 | - | ✅ | +| `dbConfig.database` | 数据库名称 | payment | ✅ | + +--- + +## 使用示例 + +### 1. 充值流程 + +**步骤 1:业务系统发送充值请求** + +发送到 RabbitMQ 队列:`pay.recharge.queue` + +```json +{ + "chain": "ETH", + "symbol": "USDT", + "address": "0x4e5b2e1dc63f6b91cb6cd759936495434c7e972f", + "timestamp": 1758610297, + "sign": "signature_hash" +} +``` + +**步骤 2:用户转账** + +用户向指定地址转账 USDT + +**步骤 3:接收通知** + +从 RabbitMQ 队列:`pay.recharge.return.queue` 接收**两次**消息: + +第一次(待确认): +```json +{ + "address": "0x4e5b2e1dc63f6b91cb6cd759936495434c7e972f", + "status": 2, + "chain": "ETH", + "symbol": "USDT", + "amount": 100.5, + "tx_hash": "0xabc..." +} +``` + +第二次(最终确认): +```json +{ + "address": "0x4e5b2e1dc63f6b91cb6cd759936495434c7e972f", + "status": 1, + "chain": "ETH", + "symbol": "USDT", + "amount": 100.5, + "tx_hash": "0xabc..." +} +``` + +--- + +### 2. 提现流程 + +**步骤 1:业务系统发送提现请求** + +发送到 RabbitMQ 队列:`pay.withdraw.queue` + +```json +{ + "queue_id": "withdraw_123", + "from_address": "0x1111...", + "to_address": "0x2222...", + "amount": 50.0, + "chain": "ETH", + "symbol": "USDT", + "timestamp": 1758610297, + "sign": "signature_hash" +} +``` + +**步骤 2:系统处理** + +- 验证签名 +- 检查余额 +- 发送链上交易 +- 等待确认 + +**步骤 3:接收通知** + +从 RabbitMQ 队列:`pay.withdraw.return.queue` 接收**一次**消息: + +```json +{ + "queue_id": "withdraw_123", + "status": 1, + "amount": 50.0, + "chain": "ETH", + "symbol": "USDT", + "tx_hash": "0xdef..." +} +``` + +--- + +### 3. 支付流程 + +**步骤 1:业务系统发送支付请求** + +发送到 RabbitMQ 队列:`pay.auto.queue` + +```json +{ + "queue_id": "pay_456", + "from_address": "0x1111...", + "to_address": "0x3333...", + "amount": 200.0, + "chain": "ETH", + "symbol": "USDT", + "order_id": "order_789", + "timestamp": 1758610297, + "sign": "signature_hash" +} +``` + +**步骤 2:系统处理** + +- 验证签名 +- 检查余额 +- 发送链上交易 +- 等待确认 + +**步骤 3:接收通知** + +从 RabbitMQ 队列:`pay.auto.return.queue` 接收**一次**消息: + +```json +{ + "queue_id": "pay_456", + "status": 1, + "amount": 200.0, + "chain": "ETH", + "symbol": "USDT", + "order_id": "order_789", + "tx_hash": "0xghi..." +} +``` + +--- + +## API 文档 + +### 状态码说明 + +| 状态码 | 常量名 | 说明 | 适用场景 | +|--------|--------|------|---------| +| `0` | STATUS_FAILED | 交易失败 | 交易被回退或执行失败 | +| `1` | STATUS_SUCCESS | 交易成功 | 交易成功并已确认 | +| `2` | STATUS_PENDING | 待确认 | 交易已检测到,等待区块确认 | +| `3` | STATUS_VERIFY_FAILED | 验证失败 | 签名验证失败 | + +### 消息结构 + +#### 充值请求 (TopupMsg_req) + +| 字段 | 类型 | 说明 | 必填 | +|------|------|------|------| +| chain | string | 链名称 (ETH) | ✅ | +| symbol | string | 币种 (USDT) | ✅ | +| address | string | 充值地址 | ✅ | +| timestamp | uint64 | 时间戳 | ✅ | +| sign | string | 签名 | ✅ | + +#### 充值响应 (TopupMsg_resp) + +| 字段 | 类型 | 说明 | +|------|------|------| +| address | string | 充值地址 | +| status | int | 状态码 (0/1/2/3) | +| chain | string | 链名称 | +| symbol | string | 币种 | +| amount | float64 | 金额 | +| tx_hash | string | 交易哈希 | + +#### 提现请求 (WithdrawMsg_req) + +| 字段 | 类型 | 说明 | 必填 | +|------|------|------|------| +| queue_id | string | 队列ID | ✅ | +| from_address | string | 转出地址 | ✅ | +| to_address | string | 转入地址 | ✅ | +| amount | float64 | 金额 | ✅ | +| chain | string | 链名称 | ✅ | +| symbol | string | 币种 | ✅ | +| timestamp | uint64 | 时间戳 | ✅ | +| sign | string | 签名 | ✅ | + +#### 提现响应 (WithdrawMsg_resp) + +| 字段 | 类型 | 说明 | +|------|------|------| +| queue_id | string | 队列ID | +| status | int | 状态码 | +| amount | float64 | 金额 | +| chain | string | 链名称 | +| symbol | string | 币种 | +| tx_hash | string | 交易哈希 | + +#### 支付请求 (PayMsg_req) + +| 字段 | 类型 | 说明 | 必填 | +|------|------|------|------| +| queue_id | string | 队列ID | ✅ | +| from_address | string | 付款地址 | ✅ | +| to_address | string | 收款地址(商户) | ✅ | +| amount | float64 | 金额 | ✅ | +| chain | string | 链名称 | ✅ | +| symbol | string | 币种 | ✅ | +| order_id | string | 订单ID | ✅ | +| timestamp | uint64 | 时间戳 | ✅ | +| sign | string | 签名 | ✅ | + +#### 支付响应 (PayMsg_resp) + +| 字段 | 类型 | 说明 | +|------|------|------| +| queue_id | string | 队列ID | +| status | int | 状态码 | +| amount | float64 | 金额 | +| chain | string | 链名称 | +| symbol | string | 币种 | +| order_id | string | 订单ID | +| tx_hash | string | 交易哈希 | + +--- + +## 开发指南 + +### 项目结构 + +``` +m2pool-payment-v2/ +├── cmd/ # 主程序入口 +│ └── main.go +├── internal/ # 内部包 +│ ├── blockchain/ # 区块链交互 +│ │ ├── blockchain.go # 统一接口 +│ │ ├── eth/ # 以太坊实现 +│ │ │ └── eth.go +│ │ └── tron/ # TRON实现(待开发) +│ ├── crypto/ # 加密工具 +│ │ └── crypto.go +│ ├── db/ # 数据库 +│ │ └── db.go +│ ├── msg/ # 消息定义 +│ │ └── msg.go +│ ├── queue/ # 消息队列 +│ │ ├── rabbitmq.go +│ │ └── README.md +│ └── utils/ # 工具函数 +│ └── utils.go +├── test/ # 测试和示例 +│ ├── test.go +│ └── config.json +├── go.mod +├── go.sum +└── README.md +``` + +### 开发环境设置 + +```bash +# 1. 安装 Go +wget https://go.dev/dl/go1.24.0.linux-amd64.tar.gz +sudo tar -C /usr/local -xzf go1.24.0.linux-amd64.tar.gz + +# 2. 设置环境变量 +export PATH=$PATH:/usr/local/go/bin +export GOPATH=$HOME/go + +# 3. 启动 MySQL +docker run -d \ + --name mysql \ + -p 3306:3306 \ + -e MYSQL_ROOT_PASSWORD=your_password \ + -e MYSQL_DATABASE=payment \ + mysql:8.0 + +# 4. 启动 RabbitMQ +docker run -d \ + --name rabbitmq \ + -p 5672:5672 \ + -p 15672:15672 \ + -e RABBITMQ_DEFAULT_USER=m2pool \ + -e RABBITMQ_DEFAULT_PASS=m2pool \ + rabbitmq:3-management + +# 5. 连接以太坊节点 +# 使用 Infura、Alchemy 或自建节点 +``` + +### 数据库表结构 + +```sql +-- 钱包余额表 +CREATE TABLE `eth_balance` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `address` VARCHAR(42) NOT NULL UNIQUE, + `private_key` VARCHAR(255) NOT NULL, + `balance` DECIMAL(20, 8) DEFAULT 0, + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX `idx_address` (`address`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- 交易记录表(可选) +CREATE TABLE `transactions` ( + `id` BIGINT AUTO_INCREMENT PRIMARY KEY, + `tx_hash` VARCHAR(66) NOT NULL UNIQUE, + `from_address` VARCHAR(42) NOT NULL, + `to_address` VARCHAR(42) NOT NULL, + `amount` DECIMAL(20, 8) NOT NULL, + `symbol` VARCHAR(10) NOT NULL, + `chain` VARCHAR(10) NOT NULL, + `tx_type` TINYINT NOT NULL COMMENT '0=充值,1=提现,2=支付', + `status` TINYINT NOT NULL COMMENT '0=失败,1=成功,2=待确认', + `block_height` BIGINT, + `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + INDEX `idx_tx_hash` (`tx_hash`), + INDEX `idx_from` (`from_address`), + INDEX `idx_to` (`to_address`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` + +--- + +## 部署指南 + +### Docker 部署(推荐) + +```dockerfile +# Dockerfile +FROM golang:1.24-alpine AS builder + +WORKDIR /app +COPY . . +RUN go mod download +RUN CGO_ENABLED=0 GOOS=linux go build -o m2pool-payment cmd/main.go + +FROM alpine:latest +RUN apk --no-cache add ca-certificates +WORKDIR /root/ + +COPY --from=builder /app/m2pool-payment . +COPY --from=builder /app/test/config.json . + +CMD ["./m2pool-payment"] +``` + +```yaml +# docker-compose.yml +version: '3.8' + +services: + payment: + build: . + depends_on: + - mysql + - rabbitmq + environment: + - CONFIG_PATH=/root/config.json + volumes: + - ./config.json:/root/config.json + restart: unless-stopped + + mysql: + image: mysql:8.0 + environment: + MYSQL_ROOT_PASSWORD: ${MYSQL_PASSWORD} + MYSQL_DATABASE: payment + ports: + - "3306:3306" + volumes: + - mysql_data:/var/lib/mysql + + rabbitmq: + image: rabbitmq:3-management + environment: + RABBITMQ_DEFAULT_USER: m2pool + RABBITMQ_DEFAULT_PASS: m2pool + ports: + - "5672:5672" + - "15672:15672" + volumes: + - rabbitmq_data:/var/lib/rabbitmq + +volumes: + mysql_data: + rabbitmq_data: +``` + +```bash +# 启动 +docker-compose up -d + +# 查看日志 +docker-compose logs -f payment + +# 停止 +docker-compose down +``` + +### 系统服务部署 + +```bash +# 1. 创建系统用户 +sudo useradd -r -s /bin/false m2pool + +# 2. 创建服务文件 +sudo nano /etc/systemd/system/m2pool-payment.service +``` + +```ini +[Unit] +Description=M2Pool Payment System +After=network.target mysql.service rabbitmq-server.service + +[Service] +Type=simple +User=m2pool +WorkingDirectory=/opt/m2pool-payment +ExecStart=/opt/m2pool-payment/m2pool-payment +Restart=on-failure +RestartSec=10 + +[Install] +WantedBy=multi-user.target +``` + +```bash +# 3. 启动服务 +sudo systemctl daemon-reload +sudo systemctl enable m2pool-payment +sudo systemctl start m2pool-payment + +# 4. 查看状态 +sudo systemctl status m2pool-payment + +# 5. 查看日志 +sudo journalctl -u m2pool-payment -f +``` + +--- + +## 性能指标 + +### 处理能力 + +| 指标 | 数值 | 说明 | +|------|------|------| +| **充值检测 TPS** | 500-1000 | 每秒处理交易数 | +| **提现/支付 TPS** | 200-500 | 包含数据库查询 | +| **消息发送 TPS** | 5000-10000 | RabbitMQ 发送速率 | +| **并发地址监听** | 10000+ | 同时监听的地址数量 | + +### 响应时间 + +| 操作 | 响应时间 | 说明 | +|------|---------|------| +| **充值待确认通知** | < 3 秒 | 检测到交易后 | +| **充值最终确认** | 4-5 分钟 | 20 个区块 | +| **提现执行** | < 5 秒 | 发送交易 | +| **提现最终确认** | 4-5 分钟 | 20 个区块 | + +### 资源占用(4核8G环境) + +| 资源 | 使用量 | 峰值 | +|------|--------|------| +| **CPU** | 5-15% | 30% | +| **内存** | 100-300 MB | 500 MB | +| **网络带宽** | 1-5 MB/s | 10 MB/s | +| **数据库连接** | 5-10 | 20 | + +--- + +## 常见问题 + +### Q1: 为什么充值会收到两次通知? + +**A:** 这是设计特性! +- **第一次**(status=2):检测到交易,提醒用户"正在确认" +- **第二次**(status=1/0):交易确认,通知最终结果 + +业务系统应该: +- status=2:显示进度,**不增加余额** +- status=1:增加余额 + +### Q2: 提现/支付为什么只有一次通知? + +**A:** 因为是系统主动发起的交易,用户已经知道在处理中,不需要额外的待确认通知。 + +### Q3: 如何处理交易失败? + +**A:** 系统会返回 status=0 的消息,业务系统应该: +- 充值失败:不增加余额,提示用户联系客服 +- 提现失败:退回用户余额 +- 支付失败:恢复订单状态,退回余额 + +### Q4: 确认需要多长时间? + +**A:** 配置为 20 个区块确认,以太坊约 12 秒/块: +- 理论时间:20 × 12 = 240 秒(4 分钟) +- 实际时间:4-5 分钟(包括网络延迟) + +### Q5: 如何保证私钥安全? + +**A:** +1. 私钥在数据库中**加密存储** +2. 仅在转账时临时解密 +3. 建议使用 HSM(硬件安全模块) +4. 使用 KMS(密钥管理服务) + +### Q6: 余额不足时如何处理? + +**A:** 系统会自动使用**归集钱包**转账。归集钱包应该: +- 保持足够的余额 +- 定期从各个钱包归集资金 +- 设置余额告警 + +### Q7: 支持哪些网络? + +**A:** +- ✅ 以太坊主网(Mainnet) +- ✅ 以太坊测试网(Goerli, Sepolia) +- ✅ 私有链 +- ⚠️ 需要修改 USDT 合约地址 + +### Q8: Gas 费用谁承担? + +**A:** +- **充值**:用户承担(用户自己发送交易) +- **提现**:平台承担(系统发送交易) +- **支付**:平台承担(系统发送交易) + +建议:提现/支付时从用户金额中扣除 Gas 费 + +### Q9: 如何监控系统状态? + +**A:** 建议监控: +- 待确认交易数量:`len(UnConfirmTxs)` +- Channel 使用率:`len(chainEventCh)/cap(chainEventCh)` +- RabbitMQ 连接状态 +- WebSocket 连接状态 +- 数据库连接池状态 + +### Q10: 如何处理重复消息? + +**A:** RabbitMQ 可能重复投递消息,业务系统应该: +1. 使用 `tx_hash` 作为唯一标识 +2. 实现幂等性处理 +3. 数据库添加唯一索引 + +--- + +## 安全建议 + +### 🔒 安全检查清单 + +- [ ] 私钥加密存储 +- [ ] 使用 HTTPS/WSS 连接 +- [ ] 签名验证所有请求 +- [ ] 限制 API 访问频率 +- [ ] 定期备份数据库 +- [ ] 监控异常交易 +- [ ] 设置余额告警 +- [ ] 使用防火墙限制访问 +- [ ] 定期更新依赖包 +- [ ] 日志脱敏处理 + +### ⚠️ 重要提示 + +1. **私钥管理** + - ❌ 不要在代码中硬编码私钥 + - ❌ 不要在日志中打印私钥 + - ✅ 使用环境变量或密钥管理服务 + +2. **网络安全** + - ✅ 使用 VPN 或专线连接区块链节点 + - ✅ RabbitMQ 启用 TLS + - ✅ MySQL 限制远程访问 + +3. **资金安全** + - ✅ 设置单笔交易限额 + - ✅ 异常交易人工审核 + - ✅ 多签钱包(建议) + - ✅ 冷热钱包分离 + +--- + +## 监控告警 + +### 推荐监控指标 + +```go +// 添加监控指标 +type Metrics struct { + TotalTransactions int64 // 总交易数 + PendingTransactions int // 待确认交易数 + FailedTransactions int64 // 失败交易数 + ChannelUsage int // Channel使用率 + LastBlockHeight uint64 // 最新区块高度 +} + +// 定期上报 +go func() { + ticker := time.NewTicker(1 * time.Minute) + for range ticker.C { + log.Printf("📊 待确认交易: %d", len(e.UnConfirmTxs)) + log.Printf("📊 监听地址数: %d", countAddresses()) + log.Printf("📊 Channel使用率: %d%%", len(ch)*100/cap(ch)) + } +}() +``` + +### 告警规则建议 + +| 指标 | 阈值 | 告警级别 | +|------|------|---------| +| 待确认交易数 | > 100 | ⚠️ 警告 | +| 待确认交易数 | > 500 | 🔴 严重 | +| Channel 使用率 | > 80% | ⚠️ 警告 | +| Channel 使用率 | > 95% | 🔴 严重 | +| 交易失败率 | > 5% | ⚠️ 警告 | +| WebSocket 断线 | 重连 > 3次/小时 | ⚠️ 警告 | + +--- + +## 故障排查 + +### 问题:充值检测不到 + +**可能原因:** +1. 地址未加入监听列表 +2. WebSocket 连接断开 +3. 合约地址配置错误 +4. 网络 ID 不匹配 + +**排查步骤:** +```bash +# 检查日志 +grep "新增钱包监听消息" logs/payment.log + +# 检查订阅状态 +grep "订阅成功" logs/payment.log + +# 测试节点连接 +curl -X POST -H "Content-Type: application/json" \ + --data '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' \ + http://localhost:8545 +``` + +### 问题:提现/支付失败 + +**可能原因:** +1. 余额不足 +2. Gas 费不足 +3. 私钥错误 +4. nonce 冲突 + +**排查步骤:** +```bash +# 检查错误日志 +grep "转账失败" logs/payment.log + +# 检查余额 +grep "余额" logs/payment.log + +# 检查私钥 +grep "查询私钥" logs/payment.log +``` + +### 问题:消息未返回 + +**可能原因:** +1. RabbitMQ 连接断开 +2. Channel 阻塞 +3. 交易未确认 + +**排查步骤:** +```bash +# 检查 RabbitMQ 连接 +rabbitmqctl list_connections + +# 检查队列状态 +rabbitmqctl list_queues + +# 检查待确认交易 +# 添加 HTTP 接口查询 UnConfirmTxs +``` + +--- + +## 贡献指南 + +欢迎贡献代码!请遵循以下步骤: + +1. Fork 本项目 +2. 创建功能分支 (`git checkout -b feature/AmazingFeature`) +3. 提交更改 (`git commit -m 'Add some AmazingFeature'`) +4. 推送到分支 (`git push origin feature/AmazingFeature`) +5. 开启 Pull Request + +### 代码规范 + +- 使用 `gofmt` 格式化代码 +- 遵循 Go 命名规范 +- 添加必要的注释 +- 编写单元测试 +- 更新文档 + +--- + +## 路线图 + +### v2.1 (计划中) +- [ ] 支持 BTC 网络 +- [ ] 支持 TRON 网络 +- [ ] 添加 HTTP API +- [ ] 添加 WebSocket 推送 +- [ ] 性能优化(读写锁、缓存) + +### v2.2 (计划中) +- [ ] 支持更多 ERC20 代币 +- [ ] 多签钱包支持 +- [ ] 交易记录持久化 +- [ ] 管理后台 +- [ ] 监控面板 + +### v3.0 (规划中) +- [ ] 微服务架构 +- [ ] 水平扩展支持 +- [ ] 分布式事务 +- [ ] 高可用部署 + +--- + +## 相关文档 + +- [RabbitMQ 使用说明](internal/queue/README.md) +- [并发能力分析](docs/CONCURRENCY_ANALYSIS.md)(如果有) +- [交易类型说明](docs/TRANSACTION_TYPES.md)(如果有) + +--- + +## 技术支持 + +- 📧 Email: support@m2pool.com +- 💬 Telegram: @m2pool_support +- 📱 WeChat: m2pool_tech + +--- + +## 许可证 + +MIT License + +Copyright (c) 2025 M2Pool Team + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +--- + +
+ +**⭐ 如果这个项目对你有帮助,请给一个 Star!⭐** + +Made with ❤️ by M2Pool Team + +
+ diff --git a/bin/build.sh b/bin/build.sh new file mode 100644 index 0000000..8985c18 --- /dev/null +++ b/bin/build.sh @@ -0,0 +1,2 @@ +#!/bin/bash +go build -o ./payment ../cmd/main.go \ No newline at end of file diff --git a/bin/start.sh b/bin/start.sh new file mode 100644 index 0000000..3500197 --- /dev/null +++ b/bin/start.sh @@ -0,0 +1,2 @@ +#/bin/bash +./payment \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..74bcb12 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,7 @@ +package main + +import server "m2pool-payment/internal" + +func main() { + server.Start() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c45b057 --- /dev/null +++ b/go.mod @@ -0,0 +1,35 @@ +module m2pool-payment + +go 1.24.0 + +toolchain go1.24.9 + +require ( + github.com/ethereum/go-ethereum v1.16.4 + github.com/go-sql-driver/mysql v1.9.3 +) + +require ( + filippo.io/edwards25519 v1.1.0 // indirect + github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/StackExchange/wmi v1.2.1 // indirect + github.com/bits-and-blooms/bitset v1.20.0 // indirect + github.com/consensys/gnark-crypto v0.18.0 // indirect + github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect + github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect + github.com/deckarep/golang-set/v2 v2.6.0 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/ethereum/c-kzg-4844/v2 v2.1.3 // indirect + github.com/ethereum/go-verkle v0.2.2 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect + github.com/gorilla/websocket v1.4.2 // indirect + github.com/holiman/uint256 v1.3.2 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect + github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect + github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/sync v0.12.0 // indirect + golang.org/x/sys v0.36.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a5857cb --- /dev/null +++ b/go.sum @@ -0,0 +1,191 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= +github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= +github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= +github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= +github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3MdfoPyRVU= +github.com/bits-and-blooms/bitset v1.20.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/pebble v1.1.5 h1:5AAWCBWbat0uE0blr8qzufZP5tBjkRyy/jWe1QWLnvw= +github.com/cockroachdb/pebble v1.1.5/go.mod h1:17wO9el1YEigxkP/YtV8NtCivQDgoCyBg5c4VR/eOWo= +github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= +github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/consensys/gnark-crypto v0.18.0 h1:vIye/FqI50VeAr0B3dx+YjeIvmc3LWz4yEfbWBpTUf0= +github.com/consensys/gnark-crypto v0.18.0/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c= +github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= +github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/crate-crypto/go-eth-kzg v1.4.0 h1:WzDGjHk4gFg6YzV0rJOAsTK4z3Qkz5jd4RE3DAvPFkg= +github.com/crate-crypto/go-eth-kzg v1.4.0/go.mod h1:J9/u5sWfznSObptgfa92Jq8rTswn6ahQWEuiLHOjCUI= +github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOVl3J+MYp5kPMoUZPp7aOYHtaua31lwRHg= +github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a/go.mod h1:sTwzHBvIzm2RfVCGNEBZgRyjwK40bVoun3ZnGOCafNM= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= +github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= +github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= +github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= +github.com/emicklei/dot v1.6.2 h1:08GN+DD79cy/tzN6uLCT84+2Wk9u+wvqP+Hkx/dIR8A= +github.com/emicklei/dot v1.6.2/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/ethereum/c-kzg-4844/v2 v2.1.3 h1:DQ21UU0VSsuGy8+pcMJHDS0CV1bKmJmxsJYK8l3MiLU= +github.com/ethereum/c-kzg-4844/v2 v2.1.3/go.mod h1:fyNcYI/yAuLWJxf4uzVtS8VDKeoAaRM8G/+ADz/pRdA= +github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab h1:rvv6MJhy07IMfEKuARQ9TKojGqLVNxQajaXEp/BoqSk= +github.com/ethereum/go-bigmodexpfix v0.0.0-20250911101455-f9e208c548ab/go.mod h1:IuLm4IsPipXKF7CW5Lzf68PIbZ5yl7FFd74l/E0o9A8= +github.com/ethereum/go-ethereum v1.16.4 h1:H6dU0r2p/amA7cYg6zyG9Nt2JrKKH6oX2utfcqrSpkQ= +github.com/ethereum/go-ethereum v1.16.4/go.mod h1:P7551slMFbjn2zOQaKrJShZVN/d8bGxp4/I6yZVlb5w= +github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= +github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= +github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY= +github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= +github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E= +github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= +github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= +github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= +github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db h1:IZUYC/xb3giYwBLMnr8d0TGTzPKFGNTCGgGLoyeX330= +github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db/go.mod h1:xTEYN9KCHxuYHs+NmrmzFcnvHMzLLNiGFafCb1n3Mfg= +github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= +github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= +github.com/holiman/uint256 v1.3.2 h1:a9EgMPSC1AAaj1SZL5zIQD3WbwTuHrMGOerLjGmM/TA= +github.com/holiman/uint256 v1.3.2/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= +github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= +github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= +github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= +github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leanovate/gopter v0.2.11 h1:vRjThO1EKPb/1NsDXuDrzldR28RLkBflWYcU9CvzWu4= +github.com/leanovate/gopter v0.2.11/go.mod h1:aK3tzZP/C+p1m3SPRE4SYZFGP7jjkuSI4f7Xvpt0S9c= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= +github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A= +github.com/mitchellh/pointerstructure v1.2.0/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8ohIXc3tViBH44KcwB2g4= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/pion/dtls/v2 v2.2.7 h1:cSUBsETxepsCSFSxC3mc/aDo14qQLMSL+O6IjG28yV8= +github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/stun/v2 v2.0.0 h1:A5+wXKLAypxQri59+tmQKVs7+l6mMM+3d+eER9ifRU0= +github.com/pion/stun/v2 v2.0.0/go.mod h1:22qRSh08fSEttYUmJZGlriq9+03jtVmXNODgLccj8GQ= +github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c= +github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= +github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.15.0 h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM= +github.com/prometheus/client_golang v1.15.0/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= +github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= +github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe h1:nbdqkIGOGfUAD54q1s2YBcBz/WcsxCO9HUQ4aGV5hUw= +github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= +github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/blockchain/blockchain.go b/internal/blockchain/blockchain.go new file mode 100644 index 0000000..99f6f52 --- /dev/null +++ b/internal/blockchain/blockchain.go @@ -0,0 +1,75 @@ +package blockchain + +import ( + "fmt" + "sync" +) + +type IChainServer interface { + AddAddress(address string, msg any) + RemoveAddress(address string) + Listen(symbol string, ch chan any) + Transfer(symbol string, msg any) error + Stop() +} + +type BlockChainServer struct { + mu sync.Mutex + chains map[string]IChainServer // "ETH", "TRON", "BTC" +} + +func NewBlockChainServer() *BlockChainServer { + return &BlockChainServer{ + chains: make(map[string]IChainServer), + } +} + +func (b *BlockChainServer) RegisterChain(name string, chain IChainServer) { + b.mu.Lock() + defer b.mu.Unlock() + b.chains[name] = chain +} + +func (b *BlockChainServer) AddAddress(chain, address string, msg any) { + if srv, ok := b.chains[chain]; ok { + srv.AddAddress(address, msg) + fmt.Printf("✅ 添加监听地址: chain=%s, address=%s\n", chain, address) + } else { + fmt.Printf("⚠️ 链未注册: %s\n", chain) + } +} + +func (b *BlockChainServer) RemoveAddress(chain, address string) { + if srv, ok := b.chains[chain]; ok { + srv.RemoveAddress(address) + fmt.Printf("🗑️ 移除监听地址: chain=%s, address=%s\n", chain, address) + } else { + fmt.Printf("⚠️ 链未注册: %s\n", chain) + } +} + +func (b *BlockChainServer) Listen(chain, symbol string, ch chan any) error { + if srv, ok := b.chains[chain]; ok { + go func() { + srv.Listen(symbol, ch) + }() + return nil + } + return fmt.Errorf("链未注册: %s", chain) +} + +func (b *BlockChainServer) Transfer(chain, symbol string, msg any) error { + if srv, ok := b.chains[chain]; ok { + fmt.Printf("💸 %s-%s发起转账: %+v\n", chain, symbol, msg) + return srv.Transfer(symbol, msg) + } + return fmt.Errorf("链未注册: %s", chain) +} + +func (b *BlockChainServer) Stop(chain string) { + if srv, ok := b.chains[chain]; ok { + fmt.Printf("💸 关闭服务: %+v\n", chain) + srv.Stop() + } + fmt.Printf("链未注册: %s", chain) +} diff --git a/internal/blockchain/eth/eth.go b/internal/blockchain/eth/eth.go new file mode 100644 index 0000000..349cc41 --- /dev/null +++ b/internal/blockchain/eth/eth.go @@ -0,0 +1,690 @@ +package eth + +import ( + "context" + "fmt" + "log" + "m2pool-payment/internal/db" + message "m2pool-payment/internal/msg" + "m2pool-payment/internal/utils" + "math/big" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" +) + +const erc20ABI = ` +[ + { + "constant": true, + "inputs": [{"name": "_owner", "type": "address"}], + "name": "balanceOf", + "outputs": [{"name": "balance", "type": "uint256"}], + "type": "function" + }, + { + "constant": false, + "inputs": [ + {"name": "_to", "type": "address"}, + {"name": "_value", "type": "uint256"} + ], + "name": "transfer", + "outputs": [{"name": "", "type": "bool"}], + "type": "function" + }, + { + "anonymous": false, + "inputs": [ + {"indexed": true, "name": "from", "type": "address"}, + {"indexed": true, "name": "to", "type": "address"}, + {"indexed": false,"name": "value","type": "uint256"} + ], + "name": "Transfer", + "type": "event" + } +] +` + +type ETHNode struct { + decodeKey string // 私钥解密密钥,从程序启动命令行获得 + NetId *big.Int // 网络ID,主网为1,其他ID可通过rpc.NetworkID方法获取 + Config message.ETHConfig // 配置文件 + WsClient *ethclient.Client + RpcClient *ethclient.Client + Db db.MySQLPool + mu sync.Mutex + ListenAddresses sync.Map // key:"钱包地址", value:bool + UnConfirmTxs map[string]message.Tx_msg // 待交易地址池,key:"交易hash", value: message.Tx + USDT *USDT // ETH相关 + RmqMsgs map[string][]any // 根据地址查找出该地址涉及的消息,消息需要断言(topupreq_msg, payreq_msg, withdrawreq_msg) + Ctx context.Context + Cancel context.CancelFunc +} + +type USDT struct { + Address common.Address // USDT合约地址 + ABI abi.ABI // USDT ABI + TransferSig common.Hash // USDT函数签名 + LogsChan chan types.Log +} + +func NewETHNode(cfg message.ETHConfig, decodeKey string) (*ETHNode, error) { + // 连入ETH节点的ws + ws_client, err := ethclient.Dial(cfg.WsURL) + if err != nil { + return nil, fmt.Errorf("failed to connect to Ethereum node: %w", err) + } + // 连入ETH节点的rpc + rpc_client, err := ethclient.Dial(cfg.RpcURL) + if err != nil { + return nil, fmt.Errorf("failed to connect to Ethereum node rpc: %w", err) + } + // 创建可取消的 context + ctx, cancel := context.WithCancel(context.Background()) + // 获得net_id + netId, err := rpc_client.NetworkID(ctx) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to connect to get node net_id: %w", err) + } + // 构造USDT合约相关 + usdt := &USDT{} + usdt.Address = common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7") // 解析合约地址 + usdt.ABI = func() abi.ABI { a, _ := abi.JSON(strings.NewReader(erc20ABI)); return a }() // 解析合约ABI + usdt.TransferSig = crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")) // 解析合约transfer函数签名 + usdt.LogsChan = make(chan types.Log, 1000) // 初始化合约日志通道 + + // 初始化数据库 + dbConn, err := db.NewMySQLPool(cfg.DbConfig) + if err != nil { + cancel() + return nil, fmt.Errorf("mysql connect error: %w", err) + } + + return ÐNode{ + decodeKey: decodeKey, + NetId: netId, + Config: cfg, + WsClient: ws_client, + RpcClient: rpc_client, + Db: *dbConn, + ListenAddresses: sync.Map{}, + UnConfirmTxs: make(map[string]message.Tx_msg), + USDT: usdt, + RmqMsgs: make(map[string][]any), + Ctx: ctx, + Cancel: cancel, + }, nil +} + +// ============================ 抽象接口 ============================ +func (e *ETHNode) AddAddress(address string, rmq_msg any) { + // 统一转换为小写 + address = strings.ToLower(address) + log.Printf("新增钱包监听消息:%v", rmq_msg) + e.ListenAddresses.Store(address, true) + e.mu.Lock() + if len(e.RmqMsgs[address]) == 0 { + e.RmqMsgs[address] = []any{rmq_msg} + } else { + e.RmqMsgs[address] = append(e.RmqMsgs[address], rmq_msg) + } + e.mu.Unlock() +} + +func (e *ETHNode) RemoveAddress(address string) { + // 统一转换为小写 + address = strings.ToLower(address) + e.ListenAddresses.Delete(address) + e.mu.Lock() + delete(e.RmqMsgs, address) + e.mu.Unlock() +} + +func (e *ETHNode) Listen(symbol string, ch chan any) { + // 启动新区块监听(用于触发交易确认检查) + go e.listenNewBlocks("USDT", ch) + switch symbol { + case "USDT": + // 启动 USDT Transfer 事件监听 + err := e.listen_usdt(ch) + if err != nil { + log.Fatal("Listen USDT Transactions Error:", err) + } + } +} + +func (e *ETHNode) Transfer(symbol string, msg any) error { + switch symbol { + case "USDT": + err := e.usdt_transfer(msg) + if err != nil { + return fmt.Errorf("%s transfer ERROR: %w", symbol, err) + } + default: + return fmt.Errorf("unsupported symbol: %s", symbol) + } + return nil +} + +// ============================ rpc节点方法 ============================ + +func (e *ETHNode) getETHBlance(address string) (*big.Int, error) { + account := common.HexToAddress(address) + ctx := context.Background() + balance, err := e.RpcClient.BalanceAt(ctx, account, nil) // nil表示最新高度 + if err != nil { + return nil, fmt.Errorf("failed to get eth balance:%w", err) + } + // fBalance := new(big.Float).SetInt(balance) + // ethValue := new(big.Float).Quo(fBalance, big.NewFloat(1e18)) // 转 ETH + + // value, _ := ethValue.Float64() // 转 float64 + return balance, nil +} + +func (e *ETHNode) getUSDTBalance(address string) (float64, error) { + // 统一转换为小写(common.HexToAddress会自动处理,但为了一致性显式转换) + address = strings.ToLower(address) + contractAddress := e.USDT.Address + accountAddress := common.HexToAddress(address) + data, err := e.USDT.ABI.Pack("balanceOf", accountAddress) + if err != nil { + return 0, fmt.Errorf("failed to pack balanceOf data: %w", err) + } + msg := ethereum.CallMsg{ + To: &contractAddress, + Data: data, + } + // 使用 CallContract 方法查询合约余额 + res, err := e.RpcClient.CallContract(e.Ctx, msg, nil) + if err != nil { + return 0, fmt.Errorf("failed to get contract balance: %w", err) + } + // 解析返回的字节为 *big.Int + outputs, err := e.USDT.ABI.Unpack("balanceOf", res) + if err != nil || len(outputs) == 0 { + return 0, fmt.Errorf("failed to unpack balanceOf result: %w", err) + } + balance, ok := outputs[0].(*big.Int) + if !ok { + return 0, fmt.Errorf("unexpected type for balanceOf result") + } + bal := utils.BigIntUSDTToFloat64(balance) + return bal, nil +} + +func (e *ETHNode) getBlockHeight() (uint64, error) { + header, err := e.RpcClient.HeaderByNumber(e.Ctx, nil) + if err != nil { + return 0, fmt.Errorf("failed to get latest block header: %w", err) + } + return header.Number.Uint64(), nil +} + +func (e *ETHNode) getSuggestGasPrice() (*big.Int, error) { + ctx := context.Background() + gasPrice, err := e.RpcClient.SuggestGasPrice(ctx) + if err != nil { + return nil, fmt.Errorf("get suggest-gasprice error:%v", err) + } + return gasPrice, nil +} + +// ============================ 业务方法 ============================ +func (e *ETHNode) listen_usdt(ch chan any) error { + fmt.Println("🔍 ETH 开始监听 USDT Transfer 事件...") + // 过滤掉非USDT数据 + query := ethereum.FilterQuery{ + Addresses: []common.Address{e.USDT.Address}, + } + // 负责重连 + for { + // 订阅日志 + sub, err := e.WsClient.SubscribeFilterLogs(e.Ctx, query, e.USDT.LogsChan) + if err != nil { + fmt.Println("❌ 订阅失败, 5秒后重试:", err) + time.Sleep(5 * time.Second) + continue + } + fmt.Println("✅ 订阅成功") + // 处理事件 + for { + select { + case err := <-sub.Err(): + fmt.Println("⚠️ 订阅异常,准备重连:", err) + sub.Unsubscribe() // 清理旧订阅 + time.Sleep(3 * time.Second) + goto reconnect // 跳出内层循环,回到外层重新订阅 + + case vLog := <-e.USDT.LogsChan: + e.handleUSDTEvent(vLog, ch) // 事件解析 + 分类,传递链消息的通道是vLog而非ch,且一次只传递一笔交易 + case <-e.Ctx.Done(): + fmt.Println("🛑 收到停止信号,退出监听") + sub.Unsubscribe() + return e.Ctx.Err() + } + } + reconnect: + } +} + +func (e *ETHNode) handleUSDTEvent(vLog types.Log, ch chan any) { + from := common.HexToAddress(vLog.Topics[1].Hex()) + to := common.HexToAddress(vLog.Topics[2].Hex()) + height := vLog.BlockNumber + fromAddr := strings.ToLower(from.Hex()) + toAddr := strings.ToLower(to.Hex()) + var transferEvent struct{ Value *big.Int } + if err := e.USDT.ABI.UnpackIntoInterface(&transferEvent, "Transfer", vLog.Data); err != nil { + fmt.Println("ABI 解析错误:", err) + return + } + // 先验证toAddr是否在监听列表里面 + _, ok := e.ListenAddresses.Load(toAddr) + if !ok { + return + } + tx_hash := vLog.TxHash.Hex() + tx, tx_ok := e.UnConfirmTxs[tx_hash] + if tx_ok { + // 【支付/提现】待确认交易中存在该交易Hash(说明是我们主动发起的交易) + // 直接走确认流程,不发送待确认消息 + // log.Printf("🔍 检测到已发起的交易: TxHash=%s, Type=%d", tx_hash, tx.TxType) + e.confirm("USDT", height, tx, ch) + } else { + // 【充值】待确认交易中不存在该交易hash(说明是外部转账) + // 添加至待确认交易中,并立即发送待确认消息 + // 1,先根据to查询RmqMsgs,再根据存在的rmq_msg中的相关数据,存入待确认交易 + value, rmq_msg_ok := e.RmqMsgs[toAddr] + var tx_type int + if rmq_msg_ok { + for _, v := range value { + _, ok := v.(message.TopupMsg_req) + if ok { + tx_type = 0 + } + _, ok1 := v.(message.WithdrawMsg_req) + if ok1 { + tx_type = 1 + } + _, ok2 := v.(message.PayMsg_req) + if ok2 { + tx_type = 2 + } + } + } + e.UnConfirmTxs[tx_hash] = message.Tx_msg{ + TxType: tx_type, + Tx: message.Tx{ + From: fromAddr, + To: toAddr, + Height: height, + TxHash: tx_hash, + Symbol: "USDT", + Value: utils.BigIntUSDTToFloat64(transferEvent.Value), + Status: 2, // 待确认状态 + }, + } + // log.Printf("📝 待确认交易新增: TxHash=%s, Height=%d, To=%s, Type=%d", tx_hash, height, toAddr, tx_type) + + // 🔔 【仅充值】立即发送待确认状态的消息(支付/提现不发送待确认消息) + if tx_type == 0 && rmq_msg_ok { + for _, v := range value { + d1, ok := v.(message.TopupMsg_req) + if ok && strings.ToLower(d1.Address) == toAddr { + pendingMsg := message.TopupMsg_resp{ + Address: toAddr, + Status: 2, // 待确认状态 + Chain: d1.Chain, + Symbol: d1.Symbol, + Amount: utils.BigIntUSDTToFloat64(transferEvent.Value), + TxHash: tx_hash, + } + // log.Printf("📤 发送待确认充值消息: TxHash=%s, Address=%s, Amount=%.2f", + // tx_hash, toAddr, pendingMsg.Amount) + + // 异步发送,避免阻塞事件处理 + go func(msg message.TopupMsg_resp) { + select { + case ch <- msg: + log.Printf("✅ 待确认充值消息已发送") + default: + log.Printf("⚠️ 通道阻塞,待确认消息发送失败") + } + }(pendingMsg) + break + } + } + } + } +} + +// listenNewBlocks 监听新区块产生,触发交易确认检查 +func (e *ETHNode) listenNewBlocks(symbol string, ch chan any) { + fmt.Println("🔍 开始监听新区块...") + + headers := make(chan *types.Header, 10) + + // 负责重连 + for { + // 订阅新区块头 + sub, err := e.WsClient.SubscribeNewHead(e.Ctx, headers) + if err != nil { + fmt.Println("❌ 订阅新区块失败, 5秒后重试:", err) + time.Sleep(5 * time.Second) + continue + } + fmt.Println("✅ 新区块订阅成功") + + // 处理新区块 + for { + select { + case err := <-sub.Err(): + fmt.Println("⚠️ 新区块订阅异常,准备重连:", err) + sub.Unsubscribe() + time.Sleep(3 * time.Second) + goto reconnect + + case header := <-headers: + // 每当有新区块,检查待确认交易 + currentHeight := header.Number.Uint64() + // log.Printf("🆕 新区块: %d", currentHeight) + + // 检查是否有待确认交易 + e.mu.Lock() + hasPendingTx := len(e.UnConfirmTxs) > 0 + e.mu.Unlock() + + if hasPendingTx { + e.checkAndConfirmTransactions(symbol, currentHeight, ch) + } + + case <-e.Ctx.Done(): + fmt.Println("🛑 停止新区块监听") + sub.Unsubscribe() + return + } + } + reconnect: + } +} + +// checkAndConfirmTransactions 检查并确认达到确认高度的交易 +func (e *ETHNode) checkAndConfirmTransactions(symbol string, currentHeight uint64, ch chan any) { + e.mu.Lock() + needConfirmList := []message.Tx_msg{} + for _, tx := range e.UnConfirmTxs { + // 检查是否达到确认高度 + if currentHeight >= tx.Tx.Height+e.Config.ConfirmHeight { + log.Printf("当前高度=%d, 交易高度=%d", currentHeight, tx.Tx.Height) + // log.Printf("✅ 交易达到确认高度: TxHash=%s, 当前高度=%d, 交易高度=%d, 需确认=%d块", + // txHash, currentHeight, tx.Tx.Height, e.Config.ConfirmHeight) + needConfirmList = append(needConfirmList, tx) + } + } + e.mu.Unlock() + + // 批量触发确认(在锁外执行,避免长时间持锁) + for _, tx := range needConfirmList { + e.confirm(symbol, currentHeight, tx, ch) + } +} + +func (e *ETHNode) confirm(symbol string, height uint64, tx message.Tx_msg, ch chan any) { + switch symbol { + case "USDT": + e.confirm_usdt(tx, height, ch) + } +} + +func (e *ETHNode) confirm_usdt(tx message.Tx_msg, height uint64, ch chan any) { + // 先从 UnConfirmTxs 中读取 + e.mu.Lock() + unConfirmTx, ok := e.UnConfirmTxs[tx.Tx.TxHash] + e.mu.Unlock() + + if !ok { + return + } + + if height < unConfirmTx.Tx.Height { + return + } + + // 超过确认高度,查询交易数据 + txHash := common.HexToHash(tx.Tx.TxHash) + receipt, err := e.RpcClient.TransactionReceipt(e.Ctx, txHash) + var status int + if err != nil { + log.Println("⚠️ 查询交易收据失败 TxHash=", txHash, err) + status = 0 + } else if receipt.Status == types.ReceiptStatusSuccessful { + status = 1 + } else { + status = 0 + } + + tx.Tx.Status = status + + // 通过通道发送,异步写避免阻塞 + go func(tx message.Tx_msg) { + e.mu.Lock() + var result_msg any + var matchIndex = -1 // 记录匹配的索引 + rmq_msg := e.RmqMsgs[tx.Tx.To] + for i, v := range rmq_msg { + // 处理充值 + d1, ok := v.(message.TopupMsg_req) + if ok { + // 统一转小写比较 + if strings.ToLower(d1.Address) == tx.Tx.To { + result_msg = message.TopupMsg_resp{ + Address: tx.Tx.To, + Status: tx.Tx.Status, + Chain: d1.Chain, + Symbol: d1.Symbol, + Amount: tx.Tx.Value, + TxHash: tx.Tx.TxHash, + } + // 充值消息不删除,可能会有多笔充值到同一地址 + break + } + } + // 处理提现 + d2, ok1 := v.(message.WithdrawMsg_req) + if ok1 { + // 统一转小写比较 + if strings.ToLower(d2.FromAddress) == tx.Tx.From && + strings.ToLower(d2.ToAddress) == tx.Tx.To && + d2.Amount == tx.Tx.Value { + result_msg = message.WithdrawMsg_resp{ + QueueId: d2.QueueId, + Status: tx.Tx.Status, + Amount: tx.Tx.Value, + Chain: d2.Chain, + Symbol: d2.Symbol, + TxHash: tx.Tx.TxHash, + } + matchIndex = i // 记录索引,稍后删除 + break + } + } + // 处理支付 + d3, ok2 := v.(message.PayMsg_req) + if ok2 { + // 统一转小写比较 + if strings.ToLower(d3.FromAddress) == tx.Tx.From && + strings.ToLower(d3.ToAddress) == tx.Tx.To && + d3.Amount == tx.Tx.Value { + result_msg = message.PayMsg_resp{ + QueueId: d3.QueueId, + Status: tx.Tx.Status, + Amount: tx.Tx.Value, + Chain: d3.Chain, + Symbol: d3.Symbol, + OrderId: d3.OrderId, + TxHash: tx.Tx.TxHash, + } + matchIndex = i // 记录索引,稍后删除 + break + } + } + } + // 循环结束后,统一删除匹配的消息(提现和支付需要删除) + if matchIndex >= 0 { + e.RmqMsgs[tx.Tx.To] = utils.Slice_delete(e.RmqMsgs[tx.Tx.To], matchIndex) + } + e.mu.Unlock() + select { + case ch <- result_msg: + default: + fmt.Println("⚠️ confirm通道阻塞,消息丢失:", txHash) + } + }(tx) + + // 删除已确认交易 + e.mu.Lock() + delete(e.UnConfirmTxs, tx.Tx.TxHash) + e.mu.Unlock() +} + +func (e *ETHNode) decodePrivatekey(address string) string { + // 统一转换为小写 + address = strings.ToLower(address) + // 查询加密后的私钥 + querySql := "SELECT `private_key` FROM eth_balance WHERE address = ? LIMIT 1;" + log.Println("查询私钥的钱包地址:", address) + var encryptedKey string + err := e.Db.QueryRow(querySql, address).Scan(&encryptedKey) + if err != nil { + log.Println("❌ 查询私钥失败:", err) + return "" + } + // 使用key解密 + privateKey := encryptedKey // 实际使用时替换成具体的解密代码 + // fmt.Println(privateKey) + return privateKey +} + +func (e *ETHNode) usdt_transfer(msg any) error { + var user_from, final_from, to string + var amount float64 + var tx_type int + now_height, err := e.getBlockHeight() + if err != nil { + return fmt.Errorf("get lastest height error: %v", err) + } + // --------------------------------------------------------------------------------------------- + // 断言,确定本次转账是哪个类型 + // 支付操作 + v, ok := msg.(message.PayMsg_req) + if ok { + e.AddAddress(v.ToAddress, v) // 存入该笔msg(AddAddress内部会转小写) + // 统一转换为小写 + user_from, final_from, to, amount = strings.ToLower(v.FromAddress), strings.ToLower(v.FromAddress), strings.ToLower(v.ToAddress), v.Amount + tx_type = 2 + } + // 提现操作 + k, ok1 := msg.(message.WithdrawMsg_req) + if ok1 { + e.AddAddress(k.ToAddress, k) // 存入该笔msg(AddAddress内部会转小写) + // 统一转换为小写 + user_from, final_from, to, amount = strings.ToLower(k.FromAddress), strings.ToLower(k.FromAddress), strings.ToLower(k.ToAddress), k.Amount + tx_type = 1 + } + // --------------------------------------------------------------------------------------------- + // 1,校验钱包余额 + balance, err := e.getUSDTBalance(user_from) + log.Printf("检测Transfer钱包=%s,余额=%f", user_from, balance) + if err != nil { + return fmt.Errorf("failed to get balance: %w", err) + } + // 2,钱包余额不足,调用归集钱包转账 + if balance < amount { + final_from = "归集钱包" + } + // 3,通过from地址前往数据库查找出对应加密后的私钥,并解密真实的私钥 + originalKey := e.decodePrivatekey(final_from) + if originalKey == "" { + return fmt.Errorf("failed to query privatekey") + } + fmt.Println(originalKey) + privateKey, err := crypto.HexToECDSA(originalKey) + if err != nil { + return fmt.Errorf("failed to parse private key: %w", err) + } + // 4, 获得nonce + nonce, err := e.RpcClient.PendingNonceAt(e.Ctx, common.HexToAddress(final_from)) + if err != nil { + return fmt.Errorf("failed to get nonce: %w", err) + } + // 5, 构造交易(ERC20 transfer 调用) + amountBigInt := utils.Float64ToBigIntUSDT(amount) + data, err := e.USDT.ABI.Pack("transfer", common.HexToAddress(to), amountBigInt) // 打包 transfer(address,uint256) 方法调用 + if err != nil { + return fmt.Errorf("failed to pack transfer data: %w", err) + } + gasPrice, err := e.getSuggestGasPrice() // 获得当前建议gasPrice + if err != nil { + return fmt.Errorf("get suggest-gasprice error:%v", err) + } + eth_balance, err := e.getETHBlance(final_from) // 获得钱包eth余额 + if err != nil { + return fmt.Errorf("%w", err) + } + var gasLimit uint64 = 100000 + gasLimit_b := new(big.Int).SetUint64(gasLimit) + gas := new(big.Int).Mul(gasLimit_b, gasPrice) + // 判断钱包eth是否支持本次交易gas费用 + if eth_balance.Cmp(gas) == -1 { + return fmt.Errorf("address=%s balance less than gas=%v(wei)", final_from, eth_balance) + } + // 构造发送到 USDT 合约地址的交易 + tx := types.NewTransaction( + nonce, + e.USDT.Address, // 发送到USDT合约地址 + big.NewInt(0), // value为0(ERC20转账不需要ETH) + gasLimit, // GasLimit设置为100000(ERC20转账需要更多gas) + gasPrice, // GasPrice: 20 Gwei + data, // 附加数据:transfer方法调用 + ) + // 6, 签名交易并获得txHash + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(e.NetId), privateKey) + txHash := signedTx.Hash().Hex() // 通过签名信息解析出交易hash + if err != nil { + return fmt.Errorf("failed to sign transaction: %w", err) + } + // 7, 发送交易 + err = e.RpcClient.SendTransaction(e.Ctx, signedTx) + if err != nil { + return fmt.Errorf("failed to send transaction: %w", err) + } + // 8, 构造交易消息 + tx_msg := message.Tx_msg{ + TxType: tx_type, + Tx: message.Tx{ + From: final_from, + To: to, + Height: now_height, + TxHash: txHash, + Symbol: "USDT", + Value: amount, + Status: 2, + }, + } + // 9, 将构造的交易消息存入待确认交易中 + e.UnConfirmTxs[txHash] = tx_msg + return nil +} + +func (e *ETHNode) Stop() { + e.Cancel() +} diff --git a/internal/crypto/crypto.go b/internal/crypto/crypto.go new file mode 100644 index 0000000..e706ceb --- /dev/null +++ b/internal/crypto/crypto.go @@ -0,0 +1,12 @@ +package crypto + +import ( + "crypto/sha256" + "encoding/hex" +) + +func Sha256Hash(key string) []byte { + bytes, _ := hex.DecodeString(key) + sha256_hash := sha256.Sum256(bytes) + return sha256_hash[:] +} diff --git a/internal/db/db.go b/internal/db/db.go new file mode 100644 index 0000000..1ee8cfa --- /dev/null +++ b/internal/db/db.go @@ -0,0 +1,70 @@ +package db + +import ( + "database/sql" + "fmt" + message "m2pool-payment/internal/msg" + + _ "github.com/go-sql-driver/mysql" +) + +type MySQLPool struct { + db *sql.DB +} + +// NewMySQLPool 初始化连接池 +func NewMySQLPool(cfg message.DbConfig) (*MySQLPool, error) { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", + cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database) + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, err + } + + // 设置连接池参数 + db.SetMaxOpenConns(cfg.MaxOpenConns) + db.SetMaxIdleConns(cfg.MaxIdleConns) + db.SetConnMaxLifetime(cfg.ConnMaxLife) + + // 测试连接 + if err := db.Ping(); err != nil { + return nil, err + } + + return &MySQLPool{db: db}, nil +} + +// Exec 执行 INSERT/UPDATE/DELETE +func (p *MySQLPool) Exec(query string, args ...any) (sql.Result, error) { + return p.db.Exec(query, args...) +} + +// Query 查询多行 +func (p *MySQLPool) Query(query string, args ...any) (*sql.Rows, error) { + return p.db.Query(query, args...) +} + +// QueryRow 查询单行 +func (p *MySQLPool) QueryRow(query string, args ...any) *sql.Row { + return p.db.QueryRow(query, args...) +} + +// Transaction 执行事务 +func (p *MySQLPool) Transaction(fn func(tx *sql.Tx) error) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if err := fn(tx); err != nil { + _ = tx.Rollback() + return err + } + + return tx.Commit() +} + +// Close 关闭连接池 +func (p *MySQLPool) Close() error { + return p.db.Close() +} diff --git a/internal/msg/msg.go b/internal/msg/msg.go new file mode 100644 index 0000000..e20f180 --- /dev/null +++ b/internal/msg/msg.go @@ -0,0 +1,125 @@ +package msg + +import "time" + +// 配置文件结构 +type Config struct { + RMQConfig RMQConfig `json:"rmq_config"` + ETHConfig ETHConfig `json:"eth_config"` +} + +type RMQConfig struct { + SubAddr string `json:"sub_addr"` // 监听地址 + PayConfig QueueConfig `json:"pay"` // 支付 + TopUpConfig QueueConfig `json:"topup"` // 充值 + WithdrawConfig QueueConfig `json:"withdraw"` // 提现 + PayRespConfig QueueConfig `json:"pay_resp"` // 支付回复 + TopUpRespConfig QueueConfig `json:"topup_resp"` // 充值回复 + WithdrawRespConfig QueueConfig `json:"withdraw_resp"` // 提现回复 +} + +type QueueConfig struct { + QueueName string `json:"queue"` + ExchangeName string `json:"exchange"` + Routing []string `json:"routing"` +} + +type ETHConfig struct { + RpcURL string `json:"rpcUrl"` // rpc连接地址 + WsURL string `json:"wsUrl"` // websocket连接地址 + ConfirmHeight uint64 `json:"confirmHeight"` // 确认所需区块数 + DbConfig DbConfig `json:"dbConfig"` +} + +// Config 数据库配置 +type DbConfig struct { + User string `json:"user"` + Password string `json:"password"` + Host string `json:"host"` + Port int `json:"port"` + Database string `json:"database"` + MaxOpenConns int `json:"maxOpenConns"` // 最大打开连接数 + MaxIdleConns int `json:"maxIdleCoons"` // 最大空闲连接数 + ConnMaxLife time.Duration `json:"connMaxLife"` // 连接最大存活时间 +} + +// ======================================================================= +// 接收的充值消息 +type TopupMsg_req struct { + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + Address string `json:"address"` + Timestamp uint64 `json:"timestamp"` + Sign string `json:"sign"` +} + +// 返回充值结果消息 +type TopupMsg_resp struct { + Address string `json:"address"` + Status int `json:"status"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + Amount float64 `json:"amount"` + TxHash string `json:"tx_hash"` +} + +// 接收的提现消息 +type WithdrawMsg_req struct { + QueueId string `json:"queue_id"` + FromAddress string `json:"from_address"` // 我们提供的地址 + ToAddress string `json:"to_address"` // 用户要提现到的地址 + Amount float64 `json:"amount"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + Timestamp uint64 `json:"timestamp"` + Sign string `json:"sign"` +} + +// 返回提现结果消息 +type WithdrawMsg_resp struct { + QueueId string `json:"queue_id"` + Status int `json:"status"` + Amount float64 `json:"amount"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + TxHash string `json:"tx_hash"` +} + +// 接收到的支付消息 +type PayMsg_req struct { + QueueId string `json:"queue_id"` + FromAddress string `json:"from_address"` // 我们提供的地址 + ToAddress string `json:"to_address"` // 卖家地址 + Amount float64 `json:"amount"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + OrderId string `json:"order_id"` // 订单号 + Timestamp uint64 `json:"timestamp"` + Sign string `json:"sign"` +} + +// 返回支付结果消息 +type PayMsg_resp struct { + QueueId string `json:"queue_id"` + Status int `json:"status"` + Amount float64 `json:"amount"` + Chain string `json:"chain"` // 链名称 + Symbol string `json:"symbol"` // 币种 + OrderId string `json:"order_id"` // 订单号 + TxHash string `json:"tx_hash"` +} + +// 节点通用消息结构 +type Tx_msg struct { + TxType int `json:"tx_type"` // 转账类型:0充值,1提现,2支付 + Tx Tx `json:"tx"` +} +type Tx struct { + From string `json:"from"` // 充值/提现/支付的来源地址 + To string `json:"to"` // 充值/提现/支付的目标地址 + Height uint64 `json:"height"` // 区块高度 + TxHash string `json:"tx_hash"` // 交易哈希 + Symbol string `json:"symbol"` // 币种 + Value float64 `json:"value"` // 数量,单位是币 + Status int `json:"status"` // 交易状态,1成功,0失败, 2待确认 +} diff --git a/internal/queue/README.md b/internal/queue/README.md new file mode 100644 index 0000000..80c6a81 --- /dev/null +++ b/internal/queue/README.md @@ -0,0 +1,363 @@ +# RabbitMQ 服务使用说明 + +## 功能概述 + +这个 RabbitMQ 服务实现了区块链支付系统的消息队列功能,包括: + +### 消费队列(监听) +1. **充值队列 (topup)** - 监听用户充值请求 +2. **提现队列 (withdraw)** - 监听用户提现请求 +3. **支付队列 (pay)** - 监听用户支付请求 + +### 发布队列(响应) +1. **充值响应队列 (topup_resp)** - 发送充值确认结果 +2. **提现响应队列 (withdraw_resp)** - 发送提现确认结果 +3. **支付响应队列 (pay_resp)** - 发送支付确认结果 + +--- + +## 快速开始 + +### 1. 安装依赖 + +```bash +go get github.com/rabbitmq/amqp091-go +``` + +### 2. 配置文件示例 (config.json) + +```json +{ + "rmq_config": { + "sub_addr": "amqp://username:password@localhost:5672", + "pay": { + "queue": "pay.auto.queue", + "exchange": "pay.exchange", + "routing": ["pay.auto.routing.key"] + }, + "topup": { + "queue": "pay.recharge.queue", + "exchange": "pay.exchange", + "routing": ["pay.recharge.routing.key"] + }, + "withdraw": { + "queue": "pay.withdraw.queue", + "exchange": "pay.exchange", + "routing": ["pay.withdraw.routing.key"] + }, + "pay_resp": { + "queue": "pay.auto.return.queue", + "exchange": "pay.exchange", + "routing": ["pay.auto.return.routing.key"] + }, + "topup_resp": { + "queue": "pay.recharge.return.queue", + "exchange": "pay.exchange", + "routing": ["pay.recharge.return.routing.key"] + }, + "withdraw_resp": { + "queue": "pay.withdraw.return.queue", + "exchange": "pay.exchange", + "routing": ["pay.withdraw.return.routing.key"] + } + } +} +``` + +### 3. 使用示例 + +```go +package main + +import ( + "log" + message "m2pool-payment/internal/msg" + rmq "m2pool-payment/internal/queue" +) + +func main() { + // 加载配置 + config := loadConfig() // 你的配置加载逻辑 + + // 创建 RabbitMQ 服务 + rmqServer, err := rmq.NewRabbitMQServer(config.RMQConfig) + if err != nil { + log.Fatalf("创建 RabbitMQ 服务失败: %v", err) + } + defer rmqServer.Close() + + // 设置消息处理回调 + rmqServer.OnTopupMsg = func(msg message.TopupMsg_req) { + log.Printf("收到充值请求: %+v", msg) + // 处理充值逻辑... + // 添加地址监听等 + } + + rmqServer.OnWithdrawMsg = func(msg message.WithdrawMsg_req) { + log.Printf("收到提现请求: %+v", msg) + // 处理提现逻辑... + // 调用区块链转账 + } + + rmqServer.OnPayMsg = func(msg message.PayMsg_req) { + log.Printf("收到支付请求: %+v", msg) + // 处理支付逻辑... + // 调用区块链转账 + } + + // 启动监听 + if err := rmqServer.Start(); err != nil { + log.Fatalf("启动监听失败: %v", err) + } + + // 发送响应消息示例 + rmqServer.PublishTopupResp(message.TopupMsg_resp{ + Address: "0x123...", + Status: 1, + Chain: "ETH", + Symbol: "USDT", + Amount: 100.0, + TxHash: "0xabc...", + }) + + // 保持运行 + select {} +} +``` + +--- + +## API 说明 + +### 创建服务 + +```go +func NewRabbitMQServer(config message.RMQConfig) (*RabbitMQServer, error) +``` + +创建一个新的 RabbitMQ 服务实例。 + +**参数:** +- `config`: RabbitMQ 配置 + +**返回:** +- `*RabbitMQServer`: 服务实例 +- `error`: 错误信息 + +--- + +### 启动监听 + +```go +func (r *RabbitMQServer) Start() error +``` + +启动所有队列的监听(topup, withdraw, pay)。 + +--- + +### 消息回调设置 + +```go +// 充值请求回调 +rmqServer.OnTopupMsg = func(msg message.TopupMsg_req) { + // 处理逻辑 +} + +// 提现请求回调 +rmqServer.OnWithdrawMsg = func(msg message.WithdrawMsg_req) { + // 处理逻辑 +} + +// 支付请求回调 +rmqServer.OnPayMsg = func(msg message.PayMsg_req) { + // 处理逻辑 +} +``` + +--- + +### 发布响应消息 + +```go +// 发布充值响应 +func (r *RabbitMQServer) PublishTopupResp(resp message.TopupMsg_resp) error + +// 发布提现响应 +func (r *RabbitMQServer) PublishWithdrawResp(resp message.WithdrawMsg_resp) error + +// 发布支付响应 +func (r *RabbitMQServer) PublishPayResp(resp message.PayMsg_resp) error +``` + +--- + +### 关闭服务 + +```go +func (r *RabbitMQServer) Close() error +``` + +优雅关闭 RabbitMQ 连接和所有监听。 + +--- + +## 消息格式 + +### 充值请求 (TopupMsg_req) +```json +{ + "chain": "ETH", + "symbol": "USDT", + "address": "0x123...", + "timestamp": 1234567890, + "sign": "signature_string" +} +``` + +### 充值响应 (TopupMsg_resp) +```json +{ + "address": "0x123...", + "status": 1, + "chain": "ETH", + "symbol": "USDT", + "amount": 100.5, + "tx_hash": "0xabc..." +} +``` + +### 提现请求 (WithdrawMsg_req) +```json +{ + "queue_id": "queue_123", + "from_address": "0x123...", + "to_address": "0x456...", + "amount": 50.0, + "chain": "ETH", + "symbol": "USDT", + "timestamp": 1234567890, + "sign": "signature_string" +} +``` + +### 提现响应 (WithdrawMsg_resp) +```json +{ + "queue_id": "queue_123", + "status": 1, + "amount": 50.0, + "chain": "ETH", + "symbol": "USDT", + "tx_hash": "0xdef..." +} +``` + +### 支付请求 (PayMsg_req) +```json +{ + "queue_id": "queue_456", + "from_address": "0x123...", + "to_address": "0x789...", + "amount": 200.0, + "chain": "ETH", + "symbol": "USDT", + "order_id": "order_789", + "timestamp": 1234567890, + "sign": "signature_string" +} +``` + +### 支付响应 (PayMsg_resp) +```json +{ + "queue_id": "queue_456", + "status": 1, + "amount": 200.0, + "chain": "ETH", + "symbol": "USDT", + "order_id": "order_789", + "tx_hash": "0xghi..." +} +``` + +--- + +## 状态码说明 + +- `status = 0` - 失败 +- `status = 1` - 成功 +- `status = 2` - 待确认(仅用于链上交易) + +--- + +## 特性 + +✅ **自动重连** - 连接断开时自动重连 +✅ **消息持久化** - 消息不会丢失 +✅ **手动确认** - 处理成功后才确认消息 +✅ **并发安全** - 支持多 goroutine 并发发布 +✅ **优雅关闭** - 支持优雅关闭所有连接 +✅ **错误处理** - 完善的错误日志和重试机制 + +--- + +## 注意事项 + +1. **连接地址格式**: `amqp://username:password@host:port/` +2. **消息确认**: 只有处理成功的消息才会被确认,失败的消息会重新入队 +3. **并发安全**: 发布消息时使用了互斥锁,可以安全地从多个 goroutine 调用 +4. **重连机制**: 连接断开时会自动重连,间隔 3 秒 +5. **消息持久化**: 消息和队列都是持久化的,重启后不会丢失 + +--- + +## 工作流程 + +``` +┌─────────────┐ ┌──────────────┐ ┌─────────────┐ +│ 业务系统 │ ------> │ RabbitMQ │ ------> │ 支付系统 │ +│ │ 充值/ │ (消息队列) │ 消费 │ (本系统) │ +│ │ 提现/ │ │ │ │ +│ │ 支付 │ │ │ │ +└─────────────┘ └──────────────┘ └─────────────┘ + | + | 监听链上 + v + ┌─────────────┐ + │ 区块链节点 │ + │ (ETH) │ + └─────────────┘ + | + | 交易确认 + v +┌─────────────┐ ┌──────────────┐ ┌─────────────┐ +│ 业务系统 │ <------ │ RabbitMQ │ <------ │ 支付系统 │ +│ │ 响应 │ (响应队列) │ 发布 │ (本系统) │ +└─────────────┘ └──────────────┘ └─────────────┘ +``` + +--- + +## 故障排查 + +### 连接失败 +- 检查 RabbitMQ 服务是否运行 +- 检查用户名密码是否正确 +- 检查网络连接和端口(默认 5672) + +### 消息未被消费 +- 检查队列绑定是否正确 +- 检查 routing key 是否匹配 +- 查看 RabbitMQ 管理界面的队列状态 + +### 消息重复消费 +- 确保消息处理成功后调用了 Ack +- 检查是否有多个消费者实例 + +--- + +## 许可证 + +MIT License + diff --git a/internal/queue/rabbitmq.go b/internal/queue/rabbitmq.go new file mode 100644 index 0000000..4614bbc --- /dev/null +++ b/internal/queue/rabbitmq.go @@ -0,0 +1,344 @@ +package rmq + +import ( + "context" + "encoding/json" + "fmt" + "log" + message "m2pool-payment/internal/msg" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// RabbitMQServer RabbitMQ 服务 +type RabbitMQServer struct { + config message.RMQConfig + conn *amqp.Connection + channel *amqp.Channel + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + + // 消息处理回调函数 + OnTopupMsg func(message.TopupMsg_req) // 充值请求回调 + OnWithdrawMsg func(message.WithdrawMsg_req) // 提现请求回调 + OnPayMsg func(message.PayMsg_req) // 支付请求回调 +} + +// NewRabbitMQServer 创建 RabbitMQ 服务 +func NewRabbitMQServer(config message.RMQConfig) (*RabbitMQServer, error) { + // 创建连接 + conn, err := amqp.Dial(config.SubAddr) + if err != nil { + return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) + } + + // 创建通道 + channel, err := conn.Channel() + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to open a channel: %w", err) + } + + // 创建可取消的 context + ctx, cancel := context.WithCancel(context.Background()) + + server := &RabbitMQServer{ + config: config, + conn: conn, + channel: channel, + ctx: ctx, + cancel: cancel, + } + + // 初始化队列和交换机 + if err := server.setupQueuesAndExchanges(); err != nil { + server.Close() + return nil, fmt.Errorf("failed to setup queues: %w", err) + } + + return server, nil +} + +// setupQueuesAndExchanges 设置队列和交换机 +func (r *RabbitMQServer) setupQueuesAndExchanges() error { + configs := []message.QueueConfig{ + r.config.PayConfig, + r.config.TopUpConfig, + r.config.WithdrawConfig, + r.config.PayRespConfig, + r.config.TopUpRespConfig, + r.config.WithdrawRespConfig, + } + + for _, cfg := range configs { + // 声明交换机 + err := r.channel.ExchangeDeclare( + cfg.ExchangeName, // 交换机名称 + "direct", // 类型:direct(与现有交换机类型一致) + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare exchange %s: %w", cfg.ExchangeName, err) + } + + // 声明队列 + _, err = r.channel.QueueDeclare( + cfg.QueueName, // 队列名称 + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare queue %s: %w", cfg.QueueName, err) + } + + // 绑定队列到交换机 + for _, routingKey := range cfg.Routing { + err = r.channel.QueueBind( + cfg.QueueName, // 队列名称 + routingKey, // routing key + cfg.ExchangeName, // 交换机名称 + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to bind queue %s to exchange %s with key %s: %w", + cfg.QueueName, cfg.ExchangeName, routingKey, err) + } + } + + // log.Printf("✅ 队列配置成功: Queue=%s, Exchange=%s, RoutingKeys=%v", + // cfg.QueueName, cfg.ExchangeName, cfg.Routing) + } + + return nil +} + +// Start 启动监听所有队列 +func (r *RabbitMQServer) Start() error { + // 启动充值消息监听 + go r.consumeTopup() + // 启动提现消息监听 + go r.consumeWithdraw() + // 启动支付消息监听 + go r.consumePay() + + // log.Println("🚀 RabbitMQ 服务启动成功,开始监听消息...") + return nil +} + +// consumeTopup 消费充值消息 +func (r *RabbitMQServer) consumeTopup() { + r.consumeQueue( + r.config.TopUpConfig.QueueName, + "topup", + func(body []byte) error { + var msg message.TopupMsg_req + if err := json.Unmarshal(body, &msg); err != nil { + return fmt.Errorf("failed to parse topup message: %w", err) + } + log.Printf("📥 [RMQ] 收到充值请求: Chain=%s, Symbol=%s, Address=%s", + msg.Chain, msg.Symbol, msg.Address) + + if r.OnTopupMsg != nil { + r.OnTopupMsg(msg) + } + return nil + }, + ) +} + +// consumeWithdraw 消费提现消息 +func (r *RabbitMQServer) consumeWithdraw() { + r.consumeQueue( + r.config.WithdrawConfig.QueueName, + "withdraw", + func(body []byte) error { + var msg message.WithdrawMsg_req + if err := json.Unmarshal(body, &msg); err != nil { + return fmt.Errorf("failed to parse withdraw message: %w", err) + } + log.Printf("📥 [RMQ] 收到提现请求: QueueId=%s, From=%s, To=%s, Amount=%.2f %s", + msg.QueueId, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Symbol) + + if r.OnWithdrawMsg != nil { + r.OnWithdrawMsg(msg) + } + return nil + }, + ) +} + +// consumePay 消费支付消息 +func (r *RabbitMQServer) consumePay() { + r.consumeQueue( + r.config.PayConfig.QueueName, + "pay", + func(body []byte) error { + var msg message.PayMsg_req + if err := json.Unmarshal(body, &msg); err != nil { + return fmt.Errorf("failed to parse pay message: %w", err) + } + log.Printf("📥 [RMQ] 收到支付请求: QueueId=%s, OrderId=%s, From=%s, To=%s, Amount=%.2f %s", + msg.QueueId, msg.OrderId, msg.FromAddress, msg.ToAddress, msg.Amount, msg.Symbol) + + if r.OnPayMsg != nil { + r.OnPayMsg(msg) + } + return nil + }, + ) +} + +// consumeQueue 通用队列消费方法 +func (r *RabbitMQServer) consumeQueue(queueName, msgType string, handler func([]byte) error) { + for { + select { + case <-r.ctx.Done(): + // log.Printf("🛑 停止监听队列: %s", queueName) + return + default: + msgs, err := r.channel.Consume( + queueName, // 队列名称 + "", // consumer tag + false, // auto-ack (设置为false,手动确认) + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + // log.Printf("❌ 消费队列 %s 失败: %v, 3秒后重试...", queueName, err) + time.Sleep(3 * time.Second) + continue + } + + // log.Printf("✅ 开始监听队列: %s (%s)", queueName, msgType) + + for msg := range msgs { + err := handler(msg.Body) + if err != nil { + // log.Printf("⚠️ 处理 %s 消息失败: %v", msgType, err) + // 消息处理失败,不确认,让消息重新入队 + msg.Nack(false, true) + } else { + // 消息处理成功,确认消息 + msg.Ack(false) + } + } + + // 如果 channel 关闭,等待后重连 + // log.Printf("⚠️ 队列 %s 连接断开,3秒后重连...", queueName) + time.Sleep(3 * time.Second) + } + } +} + +// PublishTopupResp 发布充值响应 +func (r *RabbitMQServer) PublishTopupResp(resp message.TopupMsg_resp) error { + return r.publishMessage( + r.config.TopUpRespConfig, + resp, + fmt.Sprintf("充值响应: Address=%s, Status=%d, TxHash=%s", + resp.Address, resp.Status, resp.TxHash), + ) +} + +// PublishWithdrawResp 发布提现响应 +func (r *RabbitMQServer) PublishWithdrawResp(resp message.WithdrawMsg_resp) error { + return r.publishMessage( + r.config.WithdrawRespConfig, + resp, + fmt.Sprintf("提现响应: QueueId=%s, Status=%d, TxHash=%s", + resp.QueueId, resp.Status, resp.TxHash), + ) +} + +// PublishPayResp 发布支付响应 +func (r *RabbitMQServer) PublishPayResp(resp message.PayMsg_resp) error { + return r.publishMessage( + r.config.PayRespConfig, + resp, + fmt.Sprintf("支付响应: QueueId=%s, OrderId=%s, Status=%d, TxHash=%s", + resp.QueueId, resp.OrderId, resp.Status, resp.TxHash), + ) +} + +// publishMessage 通用消息发布方法 +func (r *RabbitMQServer) publishMessage(config message.QueueConfig, msg interface{}, logMsg string) error { + r.mu.Lock() + defer r.mu.Unlock() + + // 序列化消息 + body, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + // 使用第一个 routing key(如果有多个) + routingKey := "" + if len(config.Routing) > 0 { + routingKey = config.Routing[0] + } + + // 发布消息 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = r.channel.PublishWithContext( + ctx, + config.ExchangeName, // 交换机 + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, // 持久化消息 + Timestamp: time.Now(), + }, + ) + + if err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + log.Printf("📤 [RMQ] 发送%s", logMsg) + return nil +} + +// Close 关闭连接 +func (r *RabbitMQServer) Close() error { + // log.Println("🛑 正在关闭 RabbitMQ 服务...") + + r.cancel() // 取消所有 goroutine + + if r.channel != nil { + if err := r.channel.Close(); err != nil { + // log.Printf("⚠️ 关闭 channel 失败: %v", err) + } + } + + if r.conn != nil { + if err := r.conn.Close(); err != nil { + // log.Printf("⚠️ 关闭连接失败: %v", err) + } + } + + // log.Println("✅ RabbitMQ 服务已关闭") + return nil +} + +// IsConnected 检查连接状态 +func (r *RabbitMQServer) IsConnected() bool { + return r.conn != nil && !r.conn.IsClosed() +} diff --git a/internal/server.go b/internal/server.go new file mode 100644 index 0000000..2cdcfcc --- /dev/null +++ b/internal/server.go @@ -0,0 +1,296 @@ +package server + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "log" + "m2pool-payment/internal/blockchain" + "m2pool-payment/internal/blockchain/eth" + "m2pool-payment/internal/crypto" + message "m2pool-payment/internal/msg" + rmq "m2pool-payment/internal/queue" + "os" + "os/signal" + "strings" + "syscall" +) + +const MSG_KEY string = "9f3c7a12" + +// 状态码常量 +const ( + STATUS_FAILED = 0 // 失败 + STATUS_SUCCESS = 1 // 成功 + STATUS_PENDING = 2 // 待确认 + STATUS_VERIFY_FAILED = 3 // 验证失败 +) + +type ServerCtx struct { + Config message.Config + blockChainServer *blockchain.BlockChainServer + rmqServer *rmq.RabbitMQServer +} + +var s_ctx ServerCtx + +// verifyMessage 验证消息签名 +func verifyMessage(timestamp uint64, sign string) bool { + hash_byte := crypto.Sha256Hash(fmt.Sprintf("%x", timestamp) + MSG_KEY) + hash := hex.EncodeToString(hash_byte) + return hash == sign +} + +func loadConfig() { + file, err := os.ReadFile("config.json") + if err != nil { + panic(fmt.Sprintf("读取配置文件失败: %v", err)) + } + + err = json.Unmarshal(file, &s_ctx.Config) + if err != nil { + panic(fmt.Sprintf("解析配置文件失败: %v", err)) + } + + log.Printf("✅ 配置加载成功: RPC=%s, WS=%s", + s_ctx.Config.ETHConfig.RpcURL, s_ctx.Config.ETHConfig.WsURL) +} + +func initBlockChainServer() { + // 初始化节点服务 + node_server := blockchain.NewBlockChainServer() + // 初始化ETH节点 + eth_node, err := eth.NewETHNode(s_ctx.Config.ETHConfig, "m2pool") + if err != nil { + log.Fatalf("ETH-Node Start error: %v", err) + } + // 注册ETH节点 + node_server.RegisterChain("ETH", eth_node) + // 将所有注册的blockChainServer绑定至server + s_ctx.blockChainServer = node_server + + log.Println("✅ 区块链服务初始化完成") +} + +func initRmqServer() { + // 初始化rmq服务 + rmq_server, err := rmq.NewRabbitMQServer(s_ctx.Config.RMQConfig) + if err != nil { + log.Fatalf("RabbitMQ Server Start error: %v", err) + } + // 将rmq服务绑定至server + s_ctx.rmqServer = rmq_server + + log.Printf("✅ RabbitMQ服务初始化完成: %s", s_ctx.Config.RMQConfig.SubAddr) +} + +func handleTopupMsg() { + s_ctx.rmqServer.OnTopupMsg = func(msg message.TopupMsg_req) { + msg.Address = strings.ToLower(msg.Address) + + // 验证签名 + if !verifyMessage(msg.Timestamp, msg.Sign) { + err := s_ctx.rmqServer.PublishTopupResp(message.TopupMsg_resp{ + Address: msg.Address, + Status: STATUS_VERIFY_FAILED, + Chain: msg.Chain, + Symbol: msg.Symbol, + Amount: 0, + TxHash: "", + }) + if err != nil { + log.Printf("❌ 发布充值失败响应失败: %v", err) + } + return + } + + // 添加监听地址 + s_ctx.blockChainServer.AddAddress(msg.Chain, msg.Address, msg) + } +} + +func handleWithdrawMsg() { + s_ctx.rmqServer.OnWithdrawMsg = func(msg message.WithdrawMsg_req) { + msg.FromAddress = strings.ToLower(msg.FromAddress) + msg.ToAddress = strings.ToLower(msg.ToAddress) + + // 验证签名 + if !verifyMessage(msg.Timestamp, msg.Sign) { + err := s_ctx.rmqServer.PublishWithdrawResp(message.WithdrawMsg_resp{ + QueueId: msg.QueueId, + Status: STATUS_VERIFY_FAILED, + Chain: msg.Chain, + Symbol: msg.Symbol, + Amount: 0, + TxHash: "", + }) + if err != nil { + log.Printf("❌ 发布提现失败响应失败: %v", err) + } + return + } + + // 执行转账 + err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg) + if err != nil { + log.Printf("❌ 提现转账失败: %v", err) + // 发送失败响应 + s_ctx.rmqServer.PublishWithdrawResp(message.WithdrawMsg_resp{ + QueueId: msg.QueueId, + Status: STATUS_FAILED, + Amount: msg.Amount, + Chain: msg.Chain, + Symbol: msg.Symbol, + TxHash: "", + }) + } + } +} + +func handlePayMsg() { + s_ctx.rmqServer.OnPayMsg = func(msg message.PayMsg_req) { + msg.FromAddress = strings.ToLower(msg.FromAddress) + msg.ToAddress = strings.ToLower(msg.ToAddress) + + // 验证签名 + if !verifyMessage(msg.Timestamp, msg.Sign) { + err := s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ + QueueId: msg.QueueId, + Status: STATUS_VERIFY_FAILED, + Amount: msg.Amount, + Chain: msg.Chain, + Symbol: msg.Symbol, + OrderId: msg.OrderId, + TxHash: "", + }) + if err != nil { + log.Printf("❌ 发布支付失败响应失败: %v", err) + } + return + } + + // 执行转账 + err := s_ctx.blockChainServer.Transfer(msg.Chain, msg.Symbol, msg) + if err != nil { + log.Printf("❌ 支付转账失败: %v", err) + // 发送失败响应 + s_ctx.rmqServer.PublishPayResp(message.PayMsg_resp{ + QueueId: msg.QueueId, + Status: STATUS_FAILED, + Amount: msg.Amount, + Chain: msg.Chain, + Symbol: msg.Symbol, + OrderId: msg.OrderId, + TxHash: "", + }) + } + } +} + +func initRmqListen() { + // ================== 设置 RabbitMQ 消息处理回调 ================== + // 先设置所有回调(同步执行,避免竞态) + handleTopupMsg() + handleWithdrawMsg() + handlePayMsg() + + // 回调设置完成后,再启动 RabbitMQ 监听 + if err := s_ctx.rmqServer.Start(); err != nil { + log.Fatalf("启动 RabbitMQ 监听失败: %v", err) + } + log.Println("✅ RabbitMQ 监听启动完成") +} + +func handleChainEvent(chainEventCh chan any) { + for event := range chainEventCh { + // 添加 panic 恢复,防止单个消息处理错误导致整个 goroutine 退出 + func() { + defer func() { + if r := recover(); r != nil { + log.Printf("❌ 处理链上事件 panic: %v, event: %+v", r, event) + } + }() + + // 根据消息类型发送不同的响应 + switch msg := event.(type) { + case message.TopupMsg_resp: + // 充值确认 + if msg.Status == STATUS_PENDING { + log.Printf("📨 [链上] 充值待确认: Address=%s, Amount=%.2f, TxHash=%s", + msg.Address, msg.Amount, msg.TxHash) + } else { + log.Printf("✅ [链上] 充值确认: Address=%s, Amount=%.2f, TxHash=%s, Status=%d", + msg.Address, msg.Amount, msg.TxHash, msg.Status) + } + err := s_ctx.rmqServer.PublishTopupResp(msg) + if err != nil { + log.Printf("❌ 发送充值响应失败: %v", err) + } + + case message.WithdrawMsg_resp: + // 提现确认 + log.Printf("✅ [链上] 提现确认: QueueId=%s, Amount=%.2f, TxHash=%s, Status=%d", + msg.QueueId, msg.Amount, msg.TxHash, msg.Status) + err := s_ctx.rmqServer.PublishWithdrawResp(msg) + if err != nil { + log.Printf("❌ 发送提现响应失败: %v", err) + } + + case message.PayMsg_resp: + // 支付确认 + log.Printf("✅ [链上] 支付确认: QueueId=%s, OrderId=%s, Amount=%.2f, TxHash=%s, Status=%d", + msg.QueueId, msg.OrderId, msg.Amount, msg.TxHash, msg.Status) + err := s_ctx.rmqServer.PublishPayResp(msg) + if err != nil { + log.Printf("❌ 发送支付响应失败: %v", err) + } + + default: + log.Printf("⚠️ 未知消息类型: %T", event) + } + }() + } +} + +func Start() { + log.Println("========================================") + log.Println("🚀 M2Pool Payment System Starting...") + log.Println("========================================") + + // 加载配置 + loadConfig() + + // ================== 初始化区块链节点 ================== + initBlockChainServer() + + // ================== 初始化 RabbitMQ 服务 ================== + initRmqServer() + + // ================== 启动链上事件监听通道 ================== + chainEventCh := make(chan any, 1000) // 增加缓冲区,避免高并发丢消息 + go s_ctx.blockChainServer.Listen("ETH", "USDT", chainEventCh) + + // ================== 启动 RabbitMQ 监听 ================== + initRmqListen() + + // ================== 处理链上确认事件 ================== + go handleChainEvent(chainEventCh) + + log.Println("========================================") + log.Println("🎉 所有服务启动完成!") + log.Println("========================================") + // ================== 等待退出信号 ================== + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + // 优雅关闭 + log.Println("========================================") + log.Println("🛑 收到退出信号,正在关闭服务...") + log.Println("========================================") + + s_ctx.blockChainServer.Stop("ETH") + s_ctx.rmqServer.Close() + + log.Println("👋 服务已全部关闭") +} diff --git a/internal/utils/utils.go b/internal/utils/utils.go new file mode 100644 index 0000000..b605257 --- /dev/null +++ b/internal/utils/utils.go @@ -0,0 +1,40 @@ +package utils + +import ( + "log" + "math" + "math/big" +) + +func BigIntUSDTToFloat64(value *big.Int) float64 { + f := new(big.Float).SetInt(value) + scale := new(big.Float).SetFloat64(1e6) // USDT 精度 6 位 + f.Quo(f, scale) + result, _ := f.Float64() + return result +} + +// USDT 一般精度是 6 位小数 +const USDTDecimals = 6 + +// Float64ToBigIntUSDT 将 float64 金额转换成 *big.Int +func Float64ToBigIntUSDT(amount float64) *big.Int { + // 乘上精度系数 + scale := math.Pow10(USDTDecimals) + bigAmount := new(big.Int) + bigAmount.SetInt64(int64(amount * scale)) + return bigAmount +} + +func Slice_delete(arr []any, index int) []any { + if index < 0 || index >= len(arr) { + // 处理越界 + log.Fatalf("slice arr error: index=%d, arr length=%d", index, len(arr)) + return nil + } + if index >= 0 && index < len(arr) { + copy(arr[index:], arr[index+1:]) // 后面的元素往前移动 + arr = arr[:len(arr)-1] // 去掉最后一个多余元素 + } + return arr +}