update hex16 and get output_smt_size

This commit is contained in:
lzx
2025-06-27 18:22:48 +08:00
parent eff35c4cb4
commit e95771709c
7 changed files with 1013 additions and 205 deletions

View File

@@ -25,9 +25,11 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use anyhow::{anyhow, Result};
use clap::Parser;
use log::*;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::{sync::Mutex, time::sleep};
use tonic::transport::{Certificate, ClientTlsConfig, Endpoint};
use uuid::Uuid;
use zmq::{Context, Message, Socket};
use minotari_app_grpc::{
@@ -35,7 +37,6 @@ use minotari_app_grpc::{
conversions::transaction_output::grpc_output_with_payref,
tari_rpc::{
base_node_client::BaseNodeClient, pow_algo::PowAlgos, Block, NewBlockTemplateRequest, PowAlgo,
SubmitBlockResponse,
},
};
use minotari_app_utilities::parse_miner_input::BaseNodeGrpcClient;
@@ -56,36 +57,87 @@ use tari_core::{
},
};
use tari_utilities::hex::Hex;
use tari_utilities::ByteArray;
use jmt::{JellyfishMerkleTree, KeyHash};
use jmt::mock::MockTreeStore;
use tari_core::chain_storage::SmtHasher;
use tari_core::blocks::Block as CoreBlock;
const LOG_TARGET: &str = "gbt::main";
// ZMQ消息结构
// 区块头结构
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MiningTask {
pub coinbase_hash: String,
pub struct BlockHeader {
pub hash: String,
pub version: u32,
pub height: u64,
pub target: u64,
pub output_smt_size: u64, // 新增output_smt_size
pub block_template: String, // 序列化的区块模板
pub prev_hash: String,
pub timestamp: u64,
pub output_mr: String,
pub block_output_mr: String,
pub kernel_mr: String,
pub input_mr: String,
pub total_kernel_offset: String,
pub nonce: u64,
pub pow: ProofOfWork,
pub kernel_mmr_size: u64,
pub output_mmr_size: u64,
pub total_script_offset: String,
pub validator_node_mr: String,
pub validator_node_size: u64,
}
// 工作量证明结构
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ProofOfWork {
pub pow_algo: u64,
pub pow_data: String,
}
// 区块体结构
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BlockBody {
pub inputs: Vec<String>,
pub outputs: Vec<String>,
pub kernels: Vec<String>,
}
// 挖矿任务结构
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MiningTask {
pub job_id: String,
pub block_header: BlockHeader,
pub block_body: BlockBody,
pub output_smt_size: u64,
pub coinbase_hash: String,
pub target: u64, // 新增:目标难度
}
// 挖矿消息结构
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MiningMsg {
pub job_id: String,
pub block_header: BlockHeader,
pub output_smt_size: u64,
pub coinbase_hash: String,
pub target: u64, // 新增:目标难度
}
// 提交请求结构
#[derive(Debug, Serialize, Deserialize)]
pub struct SubmitRequest {
pub height: u64,
pub job_id: String,
pub nonce: u64,
pub solution: String,
pub block_data: String, // 序列化的区块数据
}
// 提交结果结构
#[derive(Debug, Serialize, Deserialize)]
pub struct SubmitResult {
pub job_id: String,
pub result: u8, // 1表示成功0表示失败
}
// 配置结构
#[derive(Debug, Clone)]
pub struct GbtConfig {
pub base_node_grpc_address: String,
pub base_node_http_address: String,
pub base_node_grpc_authentication: GrpcAuthentication,
pub base_node_grpc_tls_domain_name: Option<String>,
pub base_node_grpc_ca_cert_filename: Option<String>,
@@ -101,6 +153,7 @@ pub struct GbtConfig {
// GBT客户端
pub struct GbtClient {
base_node_client: BaseNodeGrpcClient,
http_client: Client,
key_manager: MemoryDbKeyManager,
consensus_manager: ConsensusManager,
wallet_payment_address: TariAddress,
@@ -112,8 +165,11 @@ pub struct GbtClient {
publisher_socket: Socket,
subscriber_socket: Socket,
// 挖矿任务缓存
mining_tasks: Arc<Mutex<HashMap<String, MiningTask>>>,
// 挖矿任务缓存按height保存最多保存3个区块
mining_tasks: Arc<Mutex<HashMap<u64, MiningTask>>>,
// 缓存上一个区块的output_smt_size避免重复请求
prev_output_smt_size_cache: Arc<Mutex<HashMap<u64, u64>>>,
}
impl GbtClient {
@@ -121,6 +177,9 @@ impl GbtClient {
// 创建BaseNode客户端
let base_node_client = Self::connect_base_node(&config).await?;
// 创建HTTP客户端
let http_client = Client::new();
// 创建密钥管理器
let key_manager = create_memory_db_key_manager().map_err(|e| anyhow!("Key manager error: {}", e))?;
@@ -158,6 +217,7 @@ impl GbtClient {
Ok(Self {
base_node_client,
http_client,
key_manager,
consensus_manager,
wallet_payment_address,
@@ -166,6 +226,7 @@ impl GbtClient {
publisher_socket,
subscriber_socket,
mining_tasks: Arc::new(Mutex::new(HashMap::new())),
prev_output_smt_size_cache: Arc::new(Mutex::new(HashMap::new())),
})
}
@@ -207,44 +268,118 @@ impl GbtClient {
Ok(node_conn)
}
/// 计算output_smt_size
fn calculate_output_smt_size(&self, block: &CoreBlock, prev_output_smt_size: u64) -> Result<u64> {
// 创建JellyfishMerkleTree用于计算
let mock_store = MockTreeStore::new(true);
let output_smt = JellyfishMerkleTree::<_, SmtHasher>::new(&mock_store);
// 生成16位16进制任务ID
fn generate_job_id() -> String {
let uuid = Uuid::new_v4();
uuid.to_string().replace("-", "")[..16].to_string()
}
// 通过HTTP获取上一个区块的output_smt_size
async fn get_prev_output_smt_size(&self, height: u64) -> Result<u64> {
let prev_height = height - 1;
let mut batch = Vec::new();
// 处理所有输出(添加新的叶子节点)
for output in block.body.outputs() {
if !output.is_burned() {
let smt_key = KeyHash(
output.commitment.as_bytes().try_into().expect("commitment is 32 bytes")
);
let smt_value = output.smt_hash(block.header.height);
batch.push((smt_key, Some(smt_value.to_vec())));
// 检查缓存
{
let cache = self.prev_output_smt_size_cache.lock().await;
if let Some(&size) = cache.get(&prev_height) {
return Ok(size);
}
}
// 通过HTTP请求获取
let url = format!("http://{}/get_header_by_height?height={}",
self.config.base_node_http_address, prev_height);
// 处理所有输入(删除叶子节点)
for input in block.body.inputs() {
let smt_key = KeyHash(
input.commitment()?.as_bytes().try_into().expect("Commitment is 32 bytes")
);
batch.push((smt_key, None));
info!(target: LOG_TARGET, "Fetching previous block output_smt_size from: {}", url);
let response = self.http_client
.get(&url)
.send()
.await
.map_err(|e| anyhow!("HTTP request error: {}", e))?;
if !response.status().is_success() {
return Err(anyhow!("HTTP request failed with status: {}", response.status()));
}
let header_data: serde_json::Value = response
.json()
.await
.map_err(|e| anyhow!("JSON parsing error: {}", e))?;
let output_smt_size = header_data["output_smt_size"]
.as_u64()
.ok_or_else(|| anyhow!("Missing output_smt_size in response"))?;
// 更新缓存
{
let mut cache = self.prev_output_smt_size_cache.lock().await;
cache.insert(prev_height, output_smt_size);
// 清理缓存只保留最近3个区块
if cache.len() > 3 {
let mut heights: Vec<u64> = cache.keys().cloned().collect();
heights.sort();
for height in heights.iter().take(heights.len() - 3) {
cache.remove(height);
}
}
}
info!(target: LOG_TARGET, "Previous block output_smt_size: {}", output_smt_size);
Ok(output_smt_size)
}
// 将gRPC Block转换为自定义结构
fn convert_block_to_structures(&self, block: &Block) -> Result<(BlockHeader, BlockBody)> {
let header = block.header.as_ref()
.ok_or_else(|| anyhow!("No header in block"))?;
// 计算SMT变化
let (_, changes) = output_smt
.put_value_set(batch, block.header.height)
.map_err(|e| anyhow!("SMT calculation error: {}", e))?;
// 计算新的output_smt_size
let mut size = prev_output_smt_size;
size += changes.node_stats.first().map(|s| s.new_leaves).unwrap_or(0) as u64;
size = size.saturating_sub(changes.node_stats.first().map(|s| s.stale_leaves).unwrap_or(0) as u64);
Ok(size)
let body = block.body.as_ref()
.ok_or_else(|| anyhow!("No body in block"))?;
// 构造BlockHeader
let block_header = BlockHeader {
hash: header.hash.to_hex(),
version: header.version,
height: header.height,
prev_hash: header.prev_hash.to_hex(),
timestamp: header.timestamp,
output_mr: header.output_mr.to_hex(),
block_output_mr: header.block_output_mr.to_hex(),
kernel_mr: header.kernel_mr.to_hex(),
input_mr: header.input_mr.to_hex(),
total_kernel_offset: header.total_kernel_offset.to_hex(),
nonce: 0, // 初始为0等待后续修改
pow: ProofOfWork {
pow_algo: header.pow.as_ref().map(|p| p.pow_algo).unwrap_or(0),
pow_data: header.pow.as_ref().map(|p| p.pow_data.to_hex()).unwrap_or_default(),
},
kernel_mmr_size: header.kernel_mmr_size,
output_mmr_size: header.output_mmr_size,
total_script_offset: header.total_script_offset.to_hex(),
validator_node_mr: header.validator_node_mr.to_hex(),
validator_node_size: header.validator_node_size,
};
// 构造BlockBody - 使用serde_json序列化然后反序列化为字符串
let inputs: Vec<String> = body.inputs.iter()
.map(|i| serde_json::to_string(i).unwrap_or_default())
.collect();
let outputs: Vec<String> = body.outputs.iter()
.map(|o| serde_json::to_string(o).unwrap_or_default())
.collect();
let kernels: Vec<String> = body.kernels.iter()
.map(|k| serde_json::to_string(k).unwrap_or_default())
.collect();
let block_body = BlockBody {
inputs,
outputs,
kernels,
};
Ok((block_header, block_body))
}
pub async fn get_block_template_and_coinbase(&mut self) -> Result<MiningTask> {
@@ -284,9 +419,9 @@ impl GbtClient {
let fee = MicroMinotari::from(miner_data.total_fees);
let reward = MicroMinotari::from(miner_data.reward);
let target_difficulty = miner_data.target_difficulty;
let target_difficulty = miner_data.target_difficulty; // 获取目标难度
info!(target: LOG_TARGET, "Generating coinbase for height {}", height);
info!(target: LOG_TARGET, "Generating coinbase for height {} with target difficulty {}", height, target_difficulty);
// 生成coinbase
let (coinbase_output, coinbase_kernel) = generate_coinbase(
@@ -326,57 +461,70 @@ impl GbtClient {
// 计算coinbase哈希
let coinbase_hash = coinbase_output.hash().to_hex();
// 将gRPC Block转换为CoreBlock以便计算output_smt_size
let core_block: CoreBlock = block.clone().try_into()
.map_err(|e| anyhow!("Block conversion error: {}", e))?;
// 转换为自定义结构
let (block_header, block_body) = self.convert_block_to_structures(&block)?;
// 获取一个区块的output_smt_size(从区块模板头中获取)
let prev_output_smt_size = block_template
.header
.as_ref()
.ok_or_else(|| anyhow!("No header in block template"))?
.output_smt_size;
// 获取一个区块的output_smt_size
let prev_output_smt_size = self.get_prev_output_smt_size(height).await?;
// 计算新的output_smt_size
let calculated_output_smt_size = self.calculate_output_smt_size(&core_block, prev_output_smt_size)?;
// 计算当前output_smt_size
let current_output_smt_size = prev_output_smt_size + block_body.outputs.len() as u64 - block_body.inputs.len() as u64;
info!(target: LOG_TARGET, "Calculated output_smt_size: {} (prev: {})",
calculated_output_smt_size, prev_output_smt_size);
// 生成任务ID
let job_id = Self::generate_job_id();
// 序列化区块模板
let block_template_json = serde_json::to_string(&block).map_err(|e| anyhow!("Serialization error: {}", e))?;
info!(target: LOG_TARGET, "Generated job_id: {}, output_smt_size: {} (prev: {}), target: {}",
job_id, current_output_smt_size, prev_output_smt_size, target_difficulty);
let mining_task = MiningTask {
job_id: job_id.clone(),
block_header,
block_body,
output_smt_size: current_output_smt_size,
coinbase_hash,
height,
target: target_difficulty,
output_smt_size: calculated_output_smt_size, // 使用计算出的值
block_template: block_template_json,
};
// 缓存挖矿任务
{
let mut tasks = self.mining_tasks.lock().await;
tasks.insert(mining_task.coinbase_hash.clone(), mining_task.clone());
tasks.insert(height, mining_task.clone());
// 清理缓存只保留最近3个区块
if tasks.len() > 3 {
let mut heights: Vec<u64> = tasks.keys().cloned().collect();
heights.sort();
for height in heights.iter().take(heights.len() - 3) {
tasks.remove(height);
}
}
}
Ok(mining_task)
}
// 通过ZMQ发送挖矿任务
pub fn send_mining_task(&self, task: &MiningTask) -> Result<()> {
let task_json = serde_json::to_string(task).map_err(|e| anyhow!("Serialization error: {}", e))?;
// 发送挖矿消息
pub fn send_mining_msg(&self, mining_task: &MiningTask) -> Result<()> {
let mining_msg = MiningMsg {
job_id: mining_task.job_id.clone(),
block_header: mining_task.block_header.clone(),
output_smt_size: mining_task.output_smt_size,
coinbase_hash: mining_task.coinbase_hash.clone(),
target: mining_task.target,
};
let msg_json = serde_json::to_string(&mining_msg).map_err(|e| anyhow!("Serialization error: {}", e))?;
self.publisher_socket
.send_multipart(&["mining_task".as_bytes(), task_json.as_bytes()], 0)
.send_multipart(&["mining_msg".as_bytes(), msg_json.as_bytes()], 0)
.map_err(|e| anyhow!("ZMQ send error: {}", e))?;
info!(target: LOG_TARGET, "Sent mining task for height {} with target {} and output_smt_size {}",
task.height, task.target, task.output_smt_size);
info!(target: LOG_TARGET, "Sent mining message for job_id {} with height {}, output_smt_size {}, and target {}",
mining_task.job_id, mining_task.block_header.height, mining_task.output_smt_size, mining_task.target);
Ok(())
}
// 接收外部提交的挖矿结果
// 接收提交请求
pub async fn receive_submit(&mut self) -> Result<Option<SubmitRequest>> {
let mut message = Message::new();
@@ -390,8 +538,8 @@ impl GbtClient {
let submit_request: SubmitRequest =
serde_json::from_str(submit_json).map_err(|e| anyhow!("Deserialization error: {}", e))?;
info!(target: LOG_TARGET, "Received submit for height {} with nonce {}",
submit_request.height, submit_request.nonce);
info!(target: LOG_TARGET, "Received submit for job_id {} with nonce {}",
submit_request.job_id, submit_request.nonce);
Ok(Some(submit_request))
} else {
@@ -407,19 +555,104 @@ impl GbtClient {
}
// 提交区块到BaseNode
pub async fn submit_block_to_base_node(&mut self, submit_request: &SubmitRequest) -> Result<SubmitBlockResponse> {
// 反序列化区块数据
let block: Block = serde_json::from_str(&submit_request.block_data)
.map_err(|e| anyhow!("Block deserialization error: {}", e))?;
pub async fn submit_block_to_base_node(&mut self, submit_request: &SubmitRequest) -> Result<SubmitResult> {
// 查找对应的挖矿任务
let mining_task = {
let tasks = self.mining_tasks.lock().await;
let mut found_task = None;
for task in tasks.values() {
if task.job_id == submit_request.job_id {
found_task = Some(task.clone());
break;
}
}
found_task.ok_or_else(|| anyhow!("Mining task not found for job_id: {}", submit_request.job_id))?
};
info!(target: LOG_TARGET, "Submitting block to base node for height {}", submit_request.height);
// 构造提交区块
let mut submit_block = Block {
header: None,
body: None,
};
// 构造header
let mut header = minotari_app_grpc::tari_rpc::BlockHeader::default();
header.hash = hex::decode(&mining_task.block_header.hash)?;
header.version = mining_task.block_header.version;
header.height = mining_task.block_header.height;
header.prev_hash = hex::decode(&mining_task.block_header.prev_hash)?;
header.timestamp = mining_task.block_header.timestamp;
header.output_mr = hex::decode(&mining_task.block_header.output_mr)?;
header.block_output_mr = hex::decode(&mining_task.block_header.block_output_mr)?;
header.kernel_mr = hex::decode(&mining_task.block_header.kernel_mr)?;
header.input_mr = hex::decode(&mining_task.block_header.input_mr)?;
header.total_kernel_offset = hex::decode(&mining_task.block_header.total_kernel_offset)?;
header.nonce = submit_request.nonce; // 使用提交的nonce
header.pow = Some(minotari_app_grpc::tari_rpc::ProofOfWork {
pow_algo: mining_task.block_header.pow.pow_algo,
pow_data: hex::decode(&mining_task.block_header.pow.pow_data)?,
});
header.kernel_mmr_size = mining_task.block_header.kernel_mmr_size;
header.output_mmr_size = mining_task.output_smt_size; // 使用计算出的output_smt_size
header.total_script_offset = hex::decode(&mining_task.block_header.total_script_offset)?;
header.validator_node_mr = hex::decode(&mining_task.block_header.validator_node_mr)?;
header.validator_node_size = mining_task.block_header.validator_node_size;
submit_block.header = Some(header);
// 构造body - 从JSON字符串反序列化
let mut body = minotari_app_grpc::tari_rpc::AggregateBody::default();
// 反序列化inputs
for input_str in &mining_task.block_body.inputs {
if let Ok(input) = serde_json::from_str(input_str) {
body.inputs.push(input);
}
}
// 反序列化outputs
for output_str in &mining_task.block_body.outputs {
if let Ok(output) = serde_json::from_str(output_str) {
body.outputs.push(output);
}
}
// 反序列化kernels
for kernel_str in &mining_task.block_body.kernels {
if let Ok(kernel) = serde_json::from_str(kernel_str) {
body.kernels.push(kernel);
}
}
submit_block.body = Some(body);
info!(target: LOG_TARGET, "Submitting block to base node for job_id {}", submit_request.job_id);
// 提交区块
let response = self.base_node_client.submit_block(block).await?;
let response = self.base_node_client.submit_block(submit_block).await?;
let response_inner = response.into_inner();
info!(target: LOG_TARGET, "Block submitted successfully for height {}", submit_request.height);
// 比较结果 - 使用Debug格式进行比较
let result = if format!("{:?}", response_inner) == submit_request.solution {
1
} else {
0
};
Ok(response.into_inner())
let submit_result = SubmitResult {
job_id: submit_request.job_id.clone(),
result,
};
// 发送提交结果
let result_json = serde_json::to_string(&submit_result).map_err(|e| anyhow!("Serialization error: {}", e))?;
self.publisher_socket
.send_multipart(&["submit_result".as_bytes(), result_json.as_bytes()], 0)
.map_err(|e| anyhow!("ZMQ send error: {}", e))?;
info!(target: LOG_TARGET, "Submitted block result for job_id {}: {}", submit_request.job_id, result);
Ok(submit_result)
}
// 主循环
@@ -430,9 +663,9 @@ impl GbtClient {
// 1. 获取区块模板和构造coinbase
match self.get_block_template_and_coinbase().await {
Ok(mining_task) => {
// 2. 通过ZMQ发送挖矿任务
if let Err(e) = self.send_mining_task(&mining_task) {
error!(target: LOG_TARGET, "Failed to send mining task: {}", e);
// 2. 通过ZMQ发送挖矿消息
if let Err(e) = self.send_mining_msg(&mining_task) {
error!(target: LOG_TARGET, "Failed to send mining message: {}", e);
}
},
Err(e) => {
@@ -448,7 +681,7 @@ impl GbtClient {
// 4. 提交区块到BaseNode
match self.submit_block_to_base_node(&submit_request).await {
Ok(_) => {
info!(target: LOG_TARGET, "Successfully submitted block for height {}", submit_request.height);
info!(target: LOG_TARGET, "Successfully processed submit for job_id {}", submit_request.job_id);
},
Err(e) => {
error!(target: LOG_TARGET, "Failed to submit block: {}", e);
@@ -483,6 +716,10 @@ struct Args {
#[arg(short, long, default_value = "127.0.0.1:18102")]
base_node: String,
/// BaseNode HTTP address
#[arg(long, default_value = "127.0.0.1:9000")]
base_node_http: String,
/// Network (mainnet, nextnet, testnet)
#[arg(short, long, default_value = "mainnet")]
network: String,
@@ -542,6 +779,7 @@ async fn main() -> Result<()> {
// 创建配置
let config = GbtConfig {
base_node_grpc_address: args.base_node,
base_node_http_address: args.base_node_http,
base_node_grpc_authentication: GrpcAuthentication::None,
base_node_grpc_tls_domain_name: args.tls_domain,
base_node_grpc_ca_cert_filename: args.tls_ca_cert,
@@ -563,4 +801,4 @@ async fn main() -> Result<()> {
client.run().await?;
Ok(())
}
}