@@ -3,10 +3,14 @@ package com.m2pool.lease.task;
import com.baomidou.dynamic.datasource.annotation.DSTransactional ;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper ;
import com.m2pool.lease.dto.ProductMachineDto ;
import com.m2pool.lease.dto.v2.RealHashrateInfoDto ;
import com.m2pool.lease.dto.v2.RealTimeHashrateQueryDto ;
import com.m2pool.lease.entity.LeaseOrderItem ;
import com.m2pool.lease.entity.LeaseProductMachine ;
import com.m2pool.lease.mapper.LeaseOrderItemMapper ;
import com.m2pool.lease.mapper.LeaseOrderMiningMapper ;
import com.m2pool.lease.mapper.LeaseProductMachineMapper ;
import org.springframework.context.annotation.Configuration ;
import org.springframework.scheduling.annotation.Async ;
import org.springframework.scheduling.annotation.EnableAsync ;
import org.springframework.scheduling.annotation.EnableScheduling ;
@@ -15,13 +19,17 @@ import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource ;
import java.math.BigDecimal ;
import java.math.RoundingMode ;
import java.time.LocalDateTime ;
import java.time.format.DateTimeFormatter ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.stream.Collectors ;
/**
* @Description 矿机实时算力入库
@@ -36,6 +44,12 @@ public class RealPowerInsetTask {
@Resource
private LeaseProductMachineMapper leaseProductMachineMapper ;
@Resource
private LeaseOrderItemMapper leaseOrderItemMapper ;
@Resource
private LeaseOrderMiningMapper leaseOrderMiningMapper ;
private static final int BATCH_SIZE = 1000 ;
private static final int THREAD_POOL_SIZE = Runtime . getRuntime ( ) . availableProcessors ( ) ;
private final ExecutorService executorService = Executors . newFixedThreadPool ( THREAD_POOL_SIZE ) ;
@@ -90,7 +104,7 @@ public class RealPowerInsetTask {
}
@Scheduled( cron = " 0 0/5 * * * ? " )
// @Scheduled(cron = " 0 0/5 * * * ? " )
@Async ( " scheduledTaskExecutor " )
@DSTransactional
public void nexaRealPowerInset ( ) {
@@ -99,7 +113,7 @@ public class RealPowerInsetTask {
}
@Scheduled( cron = " 0 0/5 * * * ? " )
// @Scheduled(cron = " 0 0/5 * * * ? " )
@Async ( " scheduledTaskExecutor " )
@DSTransactional
public void monaRealPowerInset ( ) {
@@ -107,7 +121,7 @@ public class RealPowerInsetTask {
batchInsert ( " mona " , monaPower ) ;
}
@Scheduled( cron = " 0 0/5 * * * ? " )
// @Scheduled(cron = " 0 0/5 * * * ? " )
@Async ( " scheduledTaskExecutor " )
@DSTransactional
public void rxdRealPowerInset ( ) {
@@ -115,7 +129,7 @@ public class RealPowerInsetTask {
batchInsert ( " rxd " , rxdPower ) ;
}
@Scheduled( cron = " 0 0/5 * * * ? " )
// @Scheduled(cron = " 0 0/5 * * * ? " )
@Async ( " scheduledTaskExecutor " )
@DSTransactional
public void grsRealPowerInset ( ) {
@@ -123,4 +137,95 @@ public class RealPowerInsetTask {
batchInsert ( " grs " , rxdPower ) ;
}
/*---------------------v2版本实时算力------------------------------**/
/**
* @Description: 2miners实时算力更新任务
*/
@Scheduled ( cron = " 50 0/5 * * * ? " )
@Async ( " scheduledTaskExecutor " )
@DSTransactional
public void RealPower2minersTask ( ) {
try {
System . out . println ( " GPU实时算力更新任务开始执行: " + LocalDateTime . now ( ) ) ;
//1.查找lease_order_item中所有状态status为1的订单详情
List < LeaseOrderItem > activeOrderItems = leaseOrderItemMapper . getActiveOrderItems ( ) ;
if ( activeOrderItems = = null | | activeOrderItems . isEmpty ( ) ) {
System . out . println ( " 没有需要更新实时算力的订单详情 " ) ;
return ;
}
System . out . println ( " 查询到 " + activeOrderItems . size ( ) + " 条需要更新实时算力的订单详情 " ) ;
//2. 根据订单详情信息构建查询条件,去@MiningDB库中的矿池实时算力表查询数据
// 计算当前时间的前5分钟的整点时间( 如: 2026-01-28 01:05:50 -> 2026-01-28 01:05:00)
LocalDateTime now = LocalDateTime . now ( ) ;
LocalDateTime queryTime = now . minusMinutes ( 5 ) . withSecond ( 0 ) . withNano ( 0 ) ;
String queryDateTime = queryTime . format ( DateTimeFormatter . ofPattern ( " yyyy-MM-dd HH:mm:ss " ) ) ;
System . out . println ( " 查询时间点: " + queryDateTime ) ;
// 按矿池分组查询
Map < String , List < LeaseOrderItem > > poolGroups = activeOrderItems . stream ( )
. collect ( Collectors . groupingBy ( LeaseOrderItem : : getPool ) ) ;
List < LeaseOrderItem > updatedItems = new ArrayList < > ( ) ;
for ( Map . Entry < String , List < LeaseOrderItem > > entry : poolGroups . entrySet ( ) ) {
String pool = entry . getKey ( ) ;
List < LeaseOrderItem > items = entry . getValue ( ) ;
// 构建查询条件列表
List < RealTimeHashrateQueryDto > queryList = items . stream ( )
. map ( item - > RealTimeHashrateQueryDto . builder ( )
. orderItemId ( item . getId ( ) )
. walletAddress ( item . getUser ( ) )
. miner ( item . getMiner ( ) )
. coin ( item . getCoin ( ) )
. algorithm ( item . getAlgorithm ( ) )
. pool ( item . getPool ( ) )
. build ( ) )
. collect ( Collectors . toList ( ) ) ;
String tableName = " 2miners " ; //表名
// 查询实时算力
List < RealHashrateInfoDto > hashrateList = leaseOrderMiningMapper . getRealTimeHashrate (
tableName , queryDateTime , queryList ) ;
System . out . println ( " 矿池 " + pool + " 查询到 " + hashrateList . size ( ) + " 条算力数据 " ) ;
// 构建算力数据映射表: key = walletAddress + "_" + miner + "_" + coin + "_" + algorithm
Map < String , BigDecimal > hashrateMap = hashrateList . stream ( )
. collect ( Collectors . toMap (
dto - > dto . getWalletAddress ( ) + " _ " + dto . getMiner ( ) + " _ " + dto . getCoin ( ) + " _ " + dto . getAlgorithm ( ) ,
RealHashrateInfoDto : : getPower ,
( v1 , v2 ) - > v1 // 如果有重复,取第一个
) ) ;
// 更新订单详情的实时算力
for ( LeaseOrderItem item : items ) {
String key = item . getUser ( ) + " _ " + item . getMiner ( ) + " _ " + item . getCoin ( ) + " _ " + item . getAlgorithm ( ) ;
BigDecimal power = hashrateMap . get ( key ) ;
// 如果查询到算力数据则使用, 否则设置为0
item . setPracticalPower ( power ! = null ? power : BigDecimal . ZERO ) ;
updatedItems . add ( item ) ;
}
}
//3. 批量更新lease_order_item的practical_power实时算力字段
if ( ! updatedItems . isEmpty ( ) ) {
int updateCount = leaseOrderItemMapper . updatePracticalPowerBatch ( updatedItems ) ;
System . out . println ( " 成功更新 " + updateCount + " 条订单详情的实时算力 " ) ;
}
System . out . println ( " GPU实时算力更新任务执行完成: " + LocalDateTime . now ( ) ) ;
} catch ( Exception e ) {
System . err . println ( " GPU实时算力更新任务执行失败: " + e . getMessage ( ) ) ;
throw new RuntimeException ( " GPU实时算力更新任务执行失败 " , e ) ;
}
}
/*---------------------v2版本实时算力------------------------------**/
}