add support tari-randomx

This commit is contained in:
lzx
2025-09-09 14:54:04 +08:00
parent 8cc56a489e
commit 917d7118e6
26 changed files with 14297 additions and 111 deletions

View File

@@ -41,10 +41,15 @@ pub struct Cli {
pub non_interactive_mode: bool,
#[clap(long, alias = "zmq-pub-port", default_value = "11113")]
zmq_pub_port: u16,
#[clap(long, alias = "zmq-sub-port", default_value = "11112")]
zmq_sub_port: u16,
/// Select ZMQ job algorithm: sha3x | randomxt. If omitted, defaults from config pow algo.
#[clap(long)]
pub zmq_algo: Option<String>,
/// Optional override for ZMQ publisher port (otherwise defaults by algorithm)
#[clap(long, alias = "zmq-pub-port")]
pub zmq_pub_port: Option<u16>,
/// Optional override for ZMQ subscriber port (otherwise defaults by algorithm)
#[clap(long, alias = "zmq-sub-port")]
pub zmq_sub_port: Option<u16>,
}
impl ConfigOverrideProvider for Cli {

View File

@@ -27,7 +27,7 @@ use std::{
time::{Duration, Instant},
};
use chrono::Utc;
// use chrono::Utc;
use crossbeam::channel::{bounded, Select, Sender, TrySendError};
use futures::Stream;
use log::*;
@@ -37,6 +37,7 @@ use tari_core::proof_of_work::randomx_factory::RandomXFactory;
use thread::JoinHandle;
use super::difficulty::BlockHeaderSha3;
use tari_utilities::hex::Hex;
use zmq::{Socket};
@@ -87,6 +88,7 @@ pub struct Miner {
publisher_socket: Arc<Mutex<Socket>>,
subscriber_socket: Arc<Mutex<Socket>>,
base_node_client: BaseNodeGrpcClient,
zmq_algorithm: String,
}
impl Miner {
@@ -100,6 +102,7 @@ impl Miner {
rx_factory: Option<RandomXFactory>,
publisher_socket: Arc<Mutex<Socket>>,
subscriber_socket: Arc<Mutex<Socket>>,
zmq_algorithm: String,
) -> Self {
Self {
threads: vec![],
@@ -113,6 +116,7 @@ impl Miner {
publisher_socket,
subscriber_socket,
base_node_client,
zmq_algorithm,
}
}
@@ -144,8 +148,9 @@ impl Miner {
let pub_socket = Arc::clone(&self.publisher_socket);
let sub_socket = Arc::clone(&self.subscriber_socket);
let base_node_client_clone = self.base_node_client.clone();
let zmq_algo = self.zmq_algorithm.clone();
let handle = thread
.spawn(move || mining_task(base_node_client_clone, header, difficulty, tx, waker, i, share_mode, vm_key, rx_factory, pub_socket, sub_socket))
.spawn(move || mining_task(base_node_client_clone, header, difficulty, tx, waker, i, share_mode, vm_key, rx_factory, pub_socket, sub_socket, zmq_algo))
.expect("Failed to create mining thread");
(handle, rx)
});
@@ -214,6 +219,8 @@ pub struct GbtMsg {
pub height: u64,
pub header: String,
pub u64target: u64,
/// RandomXT需要的vm_keyhex字符串Sha3X可为空字符串
pub vmkey: String,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -230,18 +237,26 @@ pub struct SubmitRequest {
pub submitidx: u64,
}
fn send_gbt_message(socket: &Arc<Mutex<Socket>>, msg: &GbtMsg) -> Result<(), Box<dyn std::error::Error>> {
fn send_gbt_message(
socket: &Arc<Mutex<Socket>>,
msg: &GbtMsg,
algorithm: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let msg_json = serde_json::to_string(msg)?;
let topic = match algorithm {
"randomxt" | "TariRandomXT" => "jobrandomxt",
_ => "jobsha3x",
};
if let Ok(pub_socket) = socket.try_lock() {
pub_socket.send_multipart(&["jobsha3x".as_bytes(), msg_json.as_bytes()], 0)?;
println!("send_gbt_message");
pub_socket.send_multipart(&[topic.as_bytes(), msg_json.as_bytes()], 0)?;
println!("send_gbt_message for {}", algorithm);
Ok(())
} else {
Err("Socket lock busy".into())
}
}
fn try_receive_nonce(socket: &Arc<Mutex<Socket>>) -> Option<u64> {
fn try_receive_nonce(socket: &Arc<Mutex<Socket>>, algorithm: &str) -> Option<u64> {
let sub_socket = match socket.try_lock() {
Ok(sub_socket) => sub_socket,
Err(_) => return None,
@@ -267,7 +282,11 @@ fn try_receive_nonce(socket: &Arc<Mutex<Socket>>) -> Option<u64> {
//println!("First frame: {:?}", frames.first().unwrap().as_slice());
//println!("Nonce frame: {:?}", &frames[1]);
if frames.first().map(Vec::as_slice) != Some(b"blksha3x") {
let expected_topic = match algorithm {
"randomxt" | "TariRandomXT" => b"blkrandomxt".as_slice(),
_ => b"blksha3x".as_slice(),
};
if frames.first().map(Vec::as_slice) != Some(expected_topic) {
return None;
}
@@ -296,6 +315,7 @@ pub fn mining_task(
rx_factory: Option<RandomXFactory>,
publisher_socket: Arc<Mutex<Socket>>,
subscriber_socket: Arc<Mutex<Socket>>,
zmq_algorithm: String,
) {
let mining_algorithm = if rx_factory.is_some() { "RandomXT" } else { "Sha3X" };
let start = Instant::now();
@@ -316,10 +336,15 @@ pub fn mining_task(
height: hasher.header.height,
header: hasher.header.mining_hash().to_string(),
u64target: target_difficulty,
vmkey: if zmq_algorithm == "randomxt" || zmq_algorithm == "TariRandomXT" {
hasher.vm_key.to_hex()
} else {
String::new()
},
};
if let Err(e) = send_gbt_message(&publisher_socket, &gbt_msg) {
if let Err(e) = send_gbt_message(&publisher_socket, &gbt_msg, &zmq_algorithm) {
error!(
target: LOG_TARGET,
"Miner {} failed to send GBT message: {}", miner, e
@@ -334,7 +359,7 @@ pub fn mining_task(
let mut check_count:u64 = 0;
// Mining work
loop {
if let Some(nonce) = try_receive_nonce(&subscriber_socket) {
if let Some(nonce) = try_receive_nonce(&subscriber_socket, &zmq_algorithm) {
check_count = 0;
hasher.header.nonce = nonce;
println!("nonce {} {}", nonce, hasher.header.nonce);

View File

@@ -185,15 +185,28 @@ pub async fn start_miner(cli: Cli) -> Result<(), ExitError> {
return Err(ExitError::new(ExitCode::GrpcError, e.to_string()));
}
}
config.zmq_publisher_port = 11113;
config.zmq_subscriber_port = 11112;
println!("pub port:{}, sub port {}", config.zmq_publisher_port, config.zmq_subscriber_port);
// Decide ZMQ algorithm from CLI flag or pow algo
let zmq_algo = cli
.zmq_algo
.as_ref()
.map(|s| s.as_str())
.unwrap_or_else(|| if config.proof_of_work_algo == PowAlgorithm::RandomXT { "randomxt" } else { "sha3x" });
// Default ports per algorithm, allow CLI override (do not rely on config fields)
let (default_pub, default_sub) = if zmq_algo == "randomxt" {
(11114u16, 11115u16)
} else {
(11112u16, 11113u16)
};
let publisher_port = cli.zmq_pub_port.unwrap_or(default_pub);
let subscriber_port = cli.zmq_sub_port.unwrap_or(default_sub);
println!("Using ZMQ ({}) ports: pub port:{}, sub port {}", zmq_algo, publisher_port, subscriber_port);
let zmq_context = ZmqContext::new();
let publisher_socket = zmq_context
.socket(zmq::PUB)
.map_err(|e| anyhow!("ZMQ publisher error: {}", e))?;
let publisher_addr = format!("tcp://0.0.0.0:{}", config.zmq_publisher_port);
let publisher_addr = format!("tcp://0.0.0.0:{}", publisher_port);
println!("publisher_addr:{}",publisher_addr);
publisher_socket
.bind(&publisher_addr)
@@ -202,7 +215,7 @@ pub async fn start_miner(cli: Cli) -> Result<(), ExitError> {
let subscriber_socket = zmq_context
.socket(SUB)
.context("Failed to create SUB socket")?;
let subscriber_addr = format!("tcp://172.17.0.1:{}", config.zmq_subscriber_port);
let subscriber_addr = format!("tcp://127.0.0.1:{}", subscriber_port);
println!("subscriber_addr:{}",subscriber_addr);
subscriber_socket
.connect(&subscriber_addr)
@@ -602,6 +615,13 @@ async fn mining_cycle(
debug!(target: LOG_TARGET, "Initializing miner");
// Determine ZMQ algorithm for miner streams (sha3x | randomxt)
let zmq_algo = cli
.zmq_algo
.as_ref()
.map(|s| s.as_str())
.unwrap_or_else(|| if config.proof_of_work_algo == PowAlgorithm::RandomXT { "randomxt" } else { "sha3x" });
let rx_factory = if config.proof_of_work_algo == PowAlgorithm::RandomXT {
Some(RandomXFactory::new(config.num_mining_threads))
} else {
@@ -618,6 +638,7 @@ async fn mining_cycle(
rx_factory,
publisher_socket,
subscriber_socket,
zmq_algo.to_string(),
);
let mut reporting_timeout = Instant::now();
let mut block_submitted = false;