tari-rust/test_gbt.py

152 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
GBT测试脚本
用于测试GBT客户端的ZMQ通信功能
"""
import zmq
import json
import time
import threading
from typing import Dict, Any
class GbtTester:
def __init__(self, pub_port: int = 5555, sub_port: int = 5556):
self.pub_port = pub_port
self.sub_port = sub_port
self.context = zmq.Context()
# 订阅者套接字(接收挖矿任务)
self.subscriber = self.context.socket(zmq.SUB)
self.subscriber.connect(f"tcp://localhost:{self.sub_port}")
self.subscriber.setsockopt_string(zmq.SUBSCRIBE, "mining_msg")
# 发布者套接字(发送提交结果)
self.publisher = self.context.socket(zmq.PUB)
self.publisher.bind(f"tcp://*:{self.pub_port}")
self.running = False
self.received_tasks: Dict[str, Dict[str, Any]] = {}
def start(self):
"""启动测试器"""
self.running = True
# 启动接收线程
self.receive_thread = threading.Thread(target=self._receive_loop)
self.receive_thread.daemon = True
self.receive_thread.start()
print(f"GBT测试器已启动")
print(f"订阅端口: {self.sub_port}")
print(f"发布端口: {self.pub_port}")
def stop(self):
"""停止测试器"""
self.running = False
self.subscriber.close()
self.publisher.close()
self.context.term()
def _receive_loop(self):
"""接收消息循环"""
while self.running:
try:
# 非阻塞接收
message = self.subscriber.recv_multipart(flags=zmq.NOBLOCK)
if message:
topic = message[0].decode('utf-8')
data = message[1].decode('utf-8')
if topic == "mining_msg":
self._handle_mining_msg(data)
except zmq.Again:
# 没有消息,继续循环
time.sleep(0.1)
continue
except Exception as e:
print(f"接收消息错误: {e}")
time.sleep(1)
def _handle_mining_msg(self, data: str):
"""处理挖矿消息"""
try:
mining_msg = json.loads(data)
job_id = mining_msg.get('job_id')
print(f"\n收到挖矿任务: {job_id}")
print(f"区块高度: {mining_msg.get('block_header', {}).get('height')}")
print(f"output_smt_size: {mining_msg.get('output_smt_size')}")
print(f"target: {mining_msg.get('target')}")
print(f"coinbase_hash: {mining_msg.get('coinbase_hash')[:16]}...")
# 保存任务
self.received_tasks[job_id] = mining_msg
# 模拟挖矿延迟1秒后提交
threading.Timer(1.0, self._submit_result, args=[job_id]).start()
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
except Exception as e:
print(f"处理挖矿消息错误: {e}")
def _submit_result(self, job_id: str):
"""提交挖矿结果"""
if job_id not in self.received_tasks:
print(f"任务 {job_id} 不存在")
return
# 构造提交请求
submit_request = {
"job_id": job_id,
"nonce": 12345, # 模拟nonce
"solution": "test_solution_hash_12345" # 模拟solution
}
try:
# 发送提交请求
message = f"submit {json.dumps(submit_request)}"
self.publisher.send_string(message)
print(f"已提交挖矿结果: {job_id}")
print(f"nonce: {submit_request['nonce']}")
print(f"solution: {submit_request['solution']}")
except Exception as e:
print(f"提交结果错误: {e}")
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"received_tasks": len(self.received_tasks),
"task_ids": list(self.received_tasks.keys())
}
def main():
"""主函数"""
print("GBT ZMQ通信测试器")
print("=" * 50)
# 创建测试器
tester = GbtTester()
try:
# 启动测试器
tester.start()
# 主循环
while True:
time.sleep(5)
stats = tester.get_stats()
print(f"\n统计信息: 已接收 {stats['received_tasks']} 个任务")
except KeyboardInterrupt:
print("\n正在停止测试器...")
finally:
tester.stop()
print("测试器已停止")
if __name__ == "__main__":
main()