update 定时任务中算力相关计算封装为策略+工厂模式

This commit is contained in:
yyb
2026-01-28 16:21:22 +08:00
parent 96a49c5813
commit 6c1d521605
12 changed files with 482 additions and 147 deletions

View File

@@ -58,13 +58,13 @@ public class PurchasedMachineListDto {
//@ApiModelProperty(value = "0 租约已到期 1挖矿中 2卖家矿机启动中")
//private Integer status;
//
//
//@ApiModelProperty(value = "挖矿开始时间")
//private LocalDateTime startTime;
//
//
//@ApiModelProperty(value = "挖矿结束时间")
//private LocalDateTime endTime;
@ApiModelProperty(value = "挖矿开始时间")
private LocalDateTime startTime;
@ApiModelProperty(value = "挖矿结束时间")
private LocalDateTime endTime;
private Long orderItemId;

View File

@@ -70,7 +70,7 @@ public interface LeaseOrderMiningMapper extends BaseMapper<LeaseOrderMining> {
* @return
*/
@MiningDB
List<RealHashrateInfoDto> getRecently24HourHashrate(@Param("list") List<PurchasedMachineListDto> list,@Param("tableName") String tableName,@Param("now") Date now);
List<RealHashrateInfoDto> getRecently24HourHashrate(@Param("list") List<PurchasedMachineListDto> list,@Param("tableName") String tableName);
/**
@@ -96,24 +96,4 @@ public interface LeaseOrderMiningMapper extends BaseMapper<LeaseOrderMining> {
*/
List<PurchasedMachineDto> getPurchasedMachineInfo(@Param("info") MiningConfigInfoDto info,@Param("authId") Long authId);
/**
* 获取指定时间点的实时算力
* @param tableName 表名2miners
* @param datetime 时间点
* @param list 查询条件列表
* @return 实时算力信息列表
*/
@MiningDB
List<RealHashrateInfoDto> getRealTimeHashrate(@Param("tableName") String tableName, @Param("datetime") String datetime, @Param("list") List<RealTimeHashrateQueryDto> list);
/**
* 获取指定时间范围内的算力数据
* @param tableName 表名2miners
* @param startTime 开始时间
* @param endTime 结束时间
* @param list 查询条件列表
* @return 实时算力信息列表
*/
@MiningDB
List<RealHashrateInfoDto> getHashrateInRange(@Param("tableName") String tableName, @Param("startTime") String startTime, @Param("endTime") String endTime, @Param("list") List<RealTimeHashrateQueryDto> list);
}

View File

@@ -2,6 +2,7 @@ package com.m2pool.lease.redis.service;//package com.m2pool.lease.redis.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.BoundSetOperations;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
@@ -314,8 +315,8 @@ public class RedisService {
// 先判断 key 是否存在
if (redisTemplate7.hasKey(key)) {
// 检查 key 的类型是否为 Hash
org.springframework.data.redis.core.DataType dataType = redisTemplate7.type(key);
if (dataType == org.springframework.data.redis.core.DataType.HASH) {
DataType dataType = redisTemplate7.type(key);
if (dataType == DataType.HASH) {
// 再判断 hKey 是否存在
if (redisTemplate7.opsForHash().hasKey(key, hKey)) {
redisTemplate7.opsForHash().delete(key, hKey);

View File

@@ -0,0 +1,128 @@
package com.m2pool.lease.strategy.hashrate;
import com.m2pool.lease.dto.v2.PurchasedMachineListDto;
import com.m2pool.lease.dto.v2.RealHashrateInfoDto;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 算力获取策略接口
* 用于处理不同矿池的数据采集频率差异
*
* @author yyb
* @since 2025-01-28
*/
public interface HashrateFetchStrategy {
/**
* 获取策略名称(矿池名称)
*
* @return 矿池名称
*/
String getPoolName();
/**
* 获取数据采集间隔(分钟)
*
* @return 采集间隔
*/
int getDataIntervalMinutes();
/**
* 获取24小时内应该采集的数据点数量
* 根据数据采集间隔计算24小时 * 60分钟 / 采集间隔
*
* @return 数据点数量
*/
default int getDataPoints() {
return 24 * 60 / getDataIntervalMinutes();
}
/**
* 获取指定时间范围内的算力数据
* 从订单的 startTime 开始,到 endTime + 30分钟结束
*
* @param list 机器信息列表
* @return 实时算力信息列表
*/
List<RealHashrateInfoDto> fetchRecently24HourHashrate(List<PurchasedMachineListDto> list);
/**
* 将算力数据按矿池、钱包地址、矿工号、币种、算法进行分组
*
* @param realPowerList 算力数据列表
* @return 分组后的数据映射
*/
default Map<String, List<RealHashrateInfoDto>> groupHashrateData(List<RealHashrateInfoDto> realPowerList) {
return realPowerList.stream()
.collect(Collectors.groupingBy(
dto -> dto.getPool() + "_" + dto.getWalletAddress() + "_" +
dto.getMiner() + "_" + dto.getCoin() + "_" + dto.getAlgorithm()
));
}
/**
* 计算订单项的平均算力
* 根据该策略对应的数据点数量计算平均值
*
* @param hashrateList 算力数据列表
* @return 平均算力
*/
default BigDecimal calculateAverageHashrate(List<RealHashrateInfoDto> hashrateList) {
if (hashrateList == null || hashrateList.isEmpty()) {
return BigDecimal.ZERO;
}
BigDecimal totalPracticalPower = BigDecimal.ZERO;
for (RealHashrateInfoDto realHashrateInfoDto : hashrateList) {
totalPracticalPower = totalPracticalPower.add(realHashrateInfoDto.getPower());
}
return totalPracticalPower.divide(BigDecimal.valueOf(getDataPoints()), 4, RoundingMode.HALF_UP);
}
/**
* 计算时间范围内的平均算力(处理补零逻辑)
*
* @param hashrateList 算力数据列表
* @param expectedDataPoints 期望的数据点数量
* @return 平均算力
*/
default BigDecimal calculateAverageHashrateInRange(List<RealHashrateInfoDto> hashrateList, int expectedDataPoints) {
if (hashrateList == null || hashrateList.isEmpty()) {
return BigDecimal.ZERO;
}
// 计算实际数据点总数
int actualDataPoints = hashrateList.size();
// 计算实际数据的总算力
BigDecimal totalPracticalPower = BigDecimal.ZERO;
for (RealHashrateInfoDto realHashrateInfoDto : hashrateList) {
totalPracticalPower = totalPracticalPower.add(realHashrateInfoDto.getPower());
}
// 补零缺失的数据点算力为0
int missingDataPoints = expectedDataPoints - actualDataPoints;
if (missingDataPoints > 0) {
System.out.println(String.format("缺失 %d 个数据点,将补零计算平均算力", missingDataPoints));
}
// 使用期望的数据点数量计算平均值(已自动补零)
return totalPracticalPower.divide(BigDecimal.valueOf(expectedDataPoints), 4, RoundingMode.HALF_UP);
}
/**
* 是否支持该矿池
*
* @param poolName 矿池名称
* @return 是否支持
*/
default boolean supports(String poolName) {
return getPoolName().equalsIgnoreCase(poolName);
}
}

View File

@@ -0,0 +1,82 @@
package com.m2pool.lease.strategy.hashrate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 算力获取策略工厂
* 根据矿池名称获取对应的策略
*
* @author yyb
* @since 2025-01-28
*/
@Component
public class HashrateStrategyFactory {
@Autowired
private List<HashrateFetchStrategy> strategies;
private Map<String, HashrateFetchStrategy> strategyMap = new HashMap<>();
@PostConstruct
public void init() {
if (strategies != null) {
for (HashrateFetchStrategy strategy : strategies) {
// 将非默认策略放入map
if (!"default".equalsIgnoreCase(strategy.getPoolName())) {
strategyMap.put(strategy.getPoolName().toLowerCase(), strategy);
}
}
}
}
/**
* 根据矿池名称获取对应的策略
*
* @param poolName 矿池名称
* @return 算力获取策略
*/
public HashrateFetchStrategy getStrategy(String poolName) {
if (poolName == null || poolName.isEmpty()) {
return getDefaultStrategy();
}
HashrateFetchStrategy strategy = strategyMap.get(poolName.toLowerCase());
if (strategy != null) {
return strategy;
}
// 如果没有找到具体策略,返回默认策略
return getDefaultStrategy();
}
/**
* 获取默认策略
*
* @return 默认算力获取策略
*/
private HashrateFetchStrategy getDefaultStrategy() {
return strategies.stream()
.filter(s -> "default".equalsIgnoreCase(s.getPoolName()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("未找到默认的算力获取策略"));
}
/**
* 获取所有已注册的矿池名称
*
* @return 矿池名称集合
*/
public Map<String, Integer> getAllPools() {
Map<String, Integer> poolInfoMap = new HashMap<>();
for (HashrateFetchStrategy strategy : strategies) {
poolInfoMap.put(strategy.getPoolName(), strategy.getDataIntervalMinutes());
}
return poolInfoMap;
}
}

View File

@@ -0,0 +1,60 @@
package com.m2pool.lease.strategy.hashrate.impl;
import com.m2pool.lease.dto.v2.PurchasedMachineListDto;
import com.m2pool.lease.dto.v2.RealHashrateInfoDto;
import com.m2pool.lease.mapper.LeaseOrderMiningMapper;
import com.m2pool.lease.strategy.hashrate.HashrateFetchStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 默认算力获取策略
* 用于处理未配置具体策略的矿池
* 数据采集频率默认每30分钟
*
* @author yyb
* @since 2025-01-28
*/
@Component
public class DefaultHashrateStrategy implements HashrateFetchStrategy {
@Autowired
private LeaseOrderMiningMapper leaseOrderMiningMapper;
private static final String POOL_NAME = "default";
private static final int DATA_INTERVAL_MINUTES = 30;
@Override
public String getPoolName() {
return POOL_NAME;
}
@Override
public int getDataIntervalMinutes() {
return DATA_INTERVAL_MINUTES;
}
@Override
public List<RealHashrateInfoDto> fetchRecently24HourHashrate(List<PurchasedMachineListDto> list) {
if (list == null || list.isEmpty()) {
return new ArrayList<>();
}
// 对 list 中的每个元素构建查询条件
for (PurchasedMachineListDto dto : list) {
LocalDateTime queryEndTime = dto.getEndTime().plusMinutes(30);
dto.setEndTime(queryEndTime);
}
return leaseOrderMiningMapper.getRecently24HourHashrate(list, POOL_NAME);
}
@Override
public boolean supports(String poolName) {
// 默认策略支持所有矿池
return true;
}
}

View File

@@ -0,0 +1,53 @@
package com.m2pool.lease.strategy.hashrate.impl;
import com.m2pool.lease.dto.v2.PurchasedMachineListDto;
import com.m2pool.lease.dto.v2.RealHashrateInfoDto;
import com.m2pool.lease.mapper.LeaseOrderMiningMapper;
import com.m2pool.lease.strategy.hashrate.HashrateFetchStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* m2pool矿池算力获取策略
* 数据采集频率每30分钟
*
* @author yyb
* @since 2025-01-28
*/
@Component
public class M2poolHashrateStrategy implements HashrateFetchStrategy {
@Autowired
private LeaseOrderMiningMapper leaseOrderMiningMapper;
private static final String POOL_NAME = "m2pool";
private static final int DATA_INTERVAL_MINUTES = 30;
@Override
public String getPoolName() {
return POOL_NAME;
}
@Override
public int getDataIntervalMinutes() {
return DATA_INTERVAL_MINUTES;
}
@Override
public List<RealHashrateInfoDto> fetchRecently24HourHashrate(List<PurchasedMachineListDto> list) {
if (list == null || list.isEmpty()) {
return new ArrayList<>();
}
// 对 list 中的每个元素构建查询条件
for (PurchasedMachineListDto dto : list) {
LocalDateTime queryEndTime = dto.getEndTime().plusMinutes(30);
dto.setEndTime(queryEndTime);
}
return leaseOrderMiningMapper.getRecently24HourHashrate(list, POOL_NAME);
}
}

View File

@@ -0,0 +1,54 @@
package com.m2pool.lease.strategy.hashrate.impl;
import com.m2pool.lease.dto.v2.PurchasedMachineListDto;
import com.m2pool.lease.dto.v2.RealHashrateInfoDto;
import com.m2pool.lease.mapper.LeaseOrderMiningMapper;
import com.m2pool.lease.strategy.hashrate.HashrateFetchStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 2miners矿池算力获取策略
* 数据采集频率每5分钟
*
* @author yyb
* @since 2025-01-28
*/
@Component
public class TwoMinersHashrateStrategy implements HashrateFetchStrategy {
@Autowired
private LeaseOrderMiningMapper leaseOrderMiningMapper;
private static final String POOL_NAME = "2miners";
private static final int DATA_INTERVAL_MINUTES = 5;
@Override
public String getPoolName() {
return POOL_NAME;
}
@Override
public int getDataIntervalMinutes() {
return DATA_INTERVAL_MINUTES;
}
@Override
public List<RealHashrateInfoDto> fetchRecently24HourHashrate(List<PurchasedMachineListDto> list) {
if (list == null || list.isEmpty()) {
return new ArrayList<>();
}
// 对 list 中的每个元素构建查询条件
for (PurchasedMachineListDto dto : list) {
LocalDateTime queryEndTime = dto.getEndTime().plusMinutes(30);
dto.setEndTime(queryEndTime);
}
return leaseOrderMiningMapper.getRecently24HourHashrate(list, POOL_NAME);
}
}

View File

@@ -19,12 +19,11 @@ import com.m2pool.lease.service.LeaseOrderItemService;
import com.m2pool.lease.service.LeasePayRecordMessageInfoService;
import com.m2pool.lease.service.LeasePayRecordMessageService;
import com.m2pool.lease.service.LeaseUserOwnedProductService;
import com.m2pool.lease.strategy.hashrate.HashrateFetchStrategy;
import com.m2pool.lease.utils.DateUtils;
import com.m2pool.lease.utils.HashUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@@ -97,6 +96,8 @@ public class OrderAndPayTask {
private LedgerLogService ledgerLogService;
@Resource
private RedisService redisService;
@Resource
private com.m2pool.lease.strategy.hashrate.HashrateStrategyFactory hashrateStrategyFactory;
/**
* 检查钱包半年内是否有 支付,充值,提现操作
@@ -146,15 +147,13 @@ public class OrderAndPayTask {
}
@Scheduled(cron = "0 0/1 * * * ? ")
//@Scheduled(cron = "0 0/30 * * * ? ")
@Scheduled(cron = "0 0/5 * * * ? ")
@Async("scheduledTaskExecutor")
@DSTransactional
public void paymentTaskV2(){
System.out.println("支付开始"+LocalDateTime.now());
Date now = DateUtils.getPreviousHalfHourOrFullHour(new Date());
LocalDateTime now1 = LocalDateTime.now();
LocalDateTime now = LocalDateTime.now();
//1.查找在进行中的订单
List<LeaseOrderInfo> needPayOrderList = leaseOrderInfoMapper.getNeedPayOrderList();
if (needPayOrderList.isEmpty()){
@@ -183,7 +182,7 @@ public class OrderAndPayTask {
List<Long> needPayIds = new ArrayList<>();
//本次需支付订单详情 集合
List<LeaseOrderItem> needPayInfos = new ArrayList<>();
getCompleteOrderAndNeedPayOrder(now1,orderComplete,completeMachines,itemCompleteIds,needPayIds,needPayInfos,leaseOrderItems);
getCompleteOrderAndNeedPayOrder(now,orderComplete,completeMachines,itemCompleteIds,needPayIds,needPayInfos,leaseOrderItems);
//订单进行过程中,卖方可能修改收款地址。所以每天支付前,需要修改旧收款地址为新地址
needPayInfos = updateOrderItemSellerWalletAddress(needPayInfos);
@@ -203,21 +202,8 @@ public class OrderAndPayTask {
(existing, replacement) -> replacement
));
List<RealHashrateInfoDto> realPowerList = new ArrayList<>();
collect.forEach((pool, list) -> {
realPowerList.addAll(leaseOrderMiningMapper.getRecently24HourHashrate(list, pool, now));
});
//挖矿信息 与 实时算力 映射
Map<String, List<RealHashrateInfoDto>> groupedData = realPowerList.stream()
.collect(Collectors.groupingBy(
dto -> dto.getPool() + "_" + dto.getWalletAddress() + "_" +
dto.getMiner() + "_" + dto.getCoin() + "_" + dto.getAlgorithm()
));
System.out.println("yyb-实时算力列表"+JSONUtil.toJsonPrettyStr(groupedData));
//订单id 与 当天平均算力 映射
Map<Long, BigDecimal> orderIdToHashrateMap = new HashMap<>();
getOrderItemIdToHashrateMap(orderIdToHashrateMap,miningAnditemIdMap,groupedData);
// 使用策略模式获取不同矿池的算力数据并计算平均算力
Map<Long, BigDecimal> orderIdToHashrateMap = fetchAndCalculateAverageHashRate(miningInfoByOrderId, collect, miningAnditemIdMap);
//根据实时算力波动情况 求得订单实际支付金额
Map<Long, LeaseOrderItem> orderIdToPriceMap = needPayInfos.stream().collect(Collectors.toMap(LeaseOrderItem::getId,Function.identity()));
@@ -231,9 +217,7 @@ public class OrderAndPayTask {
userMapItem.forEach((orderId, items) -> {
for (LeaseOrderItem item : items) {
BigDecimal realAmount = orderItemToPriceMap.get(item.getId());
BigDecimal practicalPower = orderIdToHashrateMap.get(item.getId());
BigDecimal realPayAmount = realAmount== null ? BigDecimal.ZERO : realAmount;
practicalPower = practicalPower == null ? BigDecimal.ZERO : practicalPower;
//已支付金额 + 实际待支付金额 + 实际平均算力
item.setAlreadyPayAmount(item.getAlreadyPayAmount().add(item.getPrice().multiply(BigDecimal.valueOf(item.getNumbers()))));
item.setSettlePayRealAmount(item.getSettlePayRealAmount().add(realPayAmount));
@@ -348,27 +332,56 @@ public class OrderAndPayTask {
}
/**
* 获取订单项id 与实时算力映射
**/
public void getOrderItemIdToHashrateMap(Map<Long, BigDecimal> orderIdToHashrateMap,Map<String, PurchasedMachineListDto> miningAnditemIdMap,Map<String, List<RealHashrateInfoDto>> groupedData){
for (Map.Entry<String, List<RealHashrateInfoDto>> entry : groupedData.entrySet()) {
* 使用策略模式获取不同矿池的算力数据并计算平均算力
*
* @param miningInfoByOrderId 挖矿信息列表
* @param collect 按矿池分组后的挖矿信息
* @param miningAnditemIdMap 挖矿信息与订单详情id映射
* @return 订单项id与平均算力的映射
*/
public Map<Long, BigDecimal> fetchAndCalculateAverageHashRate(
List<PurchasedMachineListDto> miningInfoByOrderId,
Map<String, List<PurchasedMachineListDto>> collect,
Map<String, PurchasedMachineListDto> miningAnditemIdMap) {
// 使用策略模式获取不同矿池的算力数据
Map<String, List<RealHashrateInfoDto>> groupedDataByPool = new HashMap<>();
collect.forEach((pool, list) -> {
HashrateFetchStrategy strategy = hashrateStrategyFactory.getStrategy(pool);
List<RealHashrateInfoDto> hashRateData = strategy.fetchRecently24HourHashrate(list);
System.out.printf("矿池 %s 使用策略 %s数据采集间隔 %d 分钟,数据点数 %d获取到 %d 条算力数据%n",
pool, strategy.getPoolName(), strategy.getDataIntervalMinutes(), strategy.getDataPoints(), hashRateData.size());
// 使用策略进行数据分组
Map<String, List<RealHashrateInfoDto>> poolGroupedData = strategy.groupHashrateData(hashRateData);
groupedDataByPool.putAll(poolGroupedData);
});
System.out.println("yyb-实时算力列表"+JSONUtil.toJsonPrettyStr(groupedDataByPool));
//订单id 与 当天平均算力 映射(使用策略计算平均算力)
Map<Long, BigDecimal> orderIdToHashrateMap = new HashMap<>();
Map<Long, String> orderItemIdToPoolMap = new HashMap<>();
// 首先建立订单项到矿池的映射
for (PurchasedMachineListDto dto : miningInfoByOrderId) {
orderItemIdToPoolMap.put(dto.getOrderItemId(), dto.getPool());
}
// 使用策略计算平均算力
for (Map.Entry<String, List<RealHashrateInfoDto>> entry : groupedDataByPool.entrySet()) {
String key = entry.getKey();
List<RealHashrateInfoDto> hashrateList = entry.getValue();
// 从第一个映射中获取对应的订单信息
// 从映射中获取对应的订单信息
PurchasedMachineListDto orderInfo = miningAnditemIdMap.get(key);
//计算平均算力
if (orderInfo != null) {
BigDecimal totalPracticalPower = BigDecimal.ZERO;
for (RealHashrateInfoDto realHashrateInfoDto : hashrateList) {
totalPracticalPower = totalPracticalPower.add(realHashrateInfoDto.getPower());
}
totalPracticalPower = totalPracticalPower.divide(BigDecimal.valueOf(dataPoints), 4, RoundingMode.HALF_UP);
orderIdToHashrateMap.put(orderInfo.getOrderItemId(), totalPracticalPower);
// 根据订单项对应的矿池获取策略
String poolName = orderItemIdToPoolMap.get(orderInfo.getOrderItemId());
HashrateFetchStrategy strategy = hashrateStrategyFactory.getStrategy(poolName);
// 使用策略计算平均算力
BigDecimal averageHashrate = strategy.calculateAverageHashrate(hashrateList);
orderIdToHashrateMap.put(orderInfo.getOrderItemId(), averageHashrate);
}
}
return orderIdToHashrateMap;
}
/**

View File

@@ -3,13 +3,15 @@ package com.m2pool.lease.task;
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.m2pool.lease.dto.ProductMachineDto;
import com.m2pool.lease.dto.v2.PurchasedMachineListDto;
import com.m2pool.lease.dto.v2.RealHashrateInfoDto;
import com.m2pool.lease.dto.v2.RealTimeHashrateQueryDto;
import com.m2pool.lease.entity.LeaseOrderItem;
import com.m2pool.lease.entity.LeaseProductMachine;
import com.m2pool.lease.mapper.LeaseOrderItemMapper;
import com.m2pool.lease.mapper.LeaseOrderMiningMapper;
import com.m2pool.lease.mapper.LeaseProductMachineMapper;
import com.m2pool.lease.strategy.hashrate.HashrateFetchStrategy;
import com.m2pool.lease.strategy.hashrate.HashrateStrategyFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
@@ -20,7 +22,6 @@ import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -48,7 +49,7 @@ public class RealPowerInsetTask {
private LeaseOrderItemMapper leaseOrderItemMapper;
@Resource
private LeaseOrderMiningMapper leaseOrderMiningMapper;
private HashrateStrategyFactory hashrateStrategyFactory;
private static final int BATCH_SIZE = 1000;
private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();
@@ -142,9 +143,10 @@ public class RealPowerInsetTask {
/**
* @Description: 2miners实时算力更新任务
* @Description: 2miners 实时平均算力(使用策略模式)
* 获取从订单创建时间到当前时间的所有算力数据,根据策略模式对应的数据采集间隔计算平均值,没有数据的时刻补零
*/
@Scheduled(cron = "50 0/5 * * * ? ")
@Scheduled(cron = "50 0/30 * * * ? ")
@Async("scheduledTaskExecutor")
@DSTransactional
public void RealPower2minersTask(){
@@ -157,12 +159,7 @@ public class RealPowerInsetTask {
return;
}
System.out.println("查询到 " + activeOrderItems.size() + " 条需要更新实时算力的订单详情");
//2. 根据订单详情信息构建查询条件,去@MiningDB库中的矿池实时算力表查询数据
// 计算当前时间的前5分钟的整点时间2026-01-28 01:05:50 -> 2026-01-28 01:05:00
LocalDateTime now = LocalDateTime.now();
LocalDateTime queryTime = now.minusMinutes(5).withSecond(0).withNano(0);
String queryDateTime = queryTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
System.out.println("查询时间点:" + queryDateTime);
// 按矿池分组查询
Map<String, List<LeaseOrderItem>> poolGroups = activeOrderItems.stream()
@@ -174,41 +171,40 @@ public class RealPowerInsetTask {
String pool = entry.getKey();
List<LeaseOrderItem> items = entry.getValue();
// 构建查询条件列表
List<RealTimeHashrateQueryDto> queryList = items.stream()
.map(item -> RealTimeHashrateQueryDto.builder()
// 获取对应的策略
HashrateFetchStrategy strategy = hashrateStrategyFactory.getStrategy(pool);
System.out.printf("矿池 %s 使用策略 %s数据采集间隔 %d 分钟%n",
pool, strategy.getPoolName(), strategy.getDataIntervalMinutes());
// 为每个订单项构建查询条件并获取算力数据
for (LeaseOrderItem item : items) {
// 构建查询条件
List<PurchasedMachineListDto> queryList = new ArrayList<>();
queryList.add(PurchasedMachineListDto.builder()
.orderItemId(item.getId())
.walletAddress(item.getUser())
.miner(item.getMiner())
.workerId(item.getMiner())
.coin(item.getCoin())
.algorithm(item.getAlgorithm())
.pool(item.getPool())
.build())
.collect(Collectors.toList());
.startTime(item.getCreateTime())
.endTime(item.getCreateTime().plusMinutes(item.getLeaseTime()))
.build());
String tableName = "2miners"; //表名
// 查询实时算力
List<RealHashrateInfoDto> hashrateList = leaseOrderMiningMapper.getRealTimeHashrate(
tableName, queryDateTime, queryList);
// 使用策略获取时间范围内的算力数据
List<RealHashrateInfoDto> hashrateList = strategy.fetchRecently24HourHashrate(queryList);
System.out.println("矿池 " + pool + " 查询到 " + hashrateList.size() + " 条算力数据");
System.out.printf("订单项 %s 查询到 %d 条算力数据%n", item.getId(), hashrateList.size());
// 构建算力数据映射表key = walletAddress + "_" + miner + "_" + coin + "_" + algorithm
Map<String, BigDecimal> hashrateMap = hashrateList.stream()
.collect(Collectors.toMap(
dto -> dto.getWalletAddress() + "_" + dto.getMiner() + "_" + dto.getCoin() + "_" + dto.getAlgorithm(),
RealHashrateInfoDto::getPower,
(v1, v2) -> v1 // 如果有重复,取第一个
));
// 使用策略计算平均算力(无需补零)
BigDecimal averagePower = strategy.calculateAverageHashrate(hashrateList);
// 更新订单详情的实时算力
for (LeaseOrderItem item : items) {
String key = item.getUser() + "_" + item.getMiner() + "_" + item.getCoin() + "_" + item.getAlgorithm();
BigDecimal power = hashrateMap.get(key);
// 如果查询到算力数据则使用否则设置为0
item.setPracticalPower(power != null ? power : BigDecimal.ZERO);
item.setPracticalPower(averagePower);
updatedItems.add(item);
System.out.printf("订单项 %s 平均算力: %s%n", item.getId(), averagePower);
}
}
@@ -221,6 +217,7 @@ public class RealPowerInsetTask {
System.out.println("GPU实时算力更新任务执行完成" + LocalDateTime.now());
} catch (Exception e) {
System.err.println("GPU实时算力更新任务执行失败" + e.getMessage());
e.printStackTrace();
throw new RuntimeException("GPU实时算力更新任务执行失败", e);
}
}

View File

@@ -114,7 +114,9 @@
coin,
algorithm,
pool,
practical_power as practicalPower
practical_power as practicalPower,
create_time as createTime,
lease_time as leaseTime
FROM lease_order_item
WHERE status = 1 AND del = false
</select>

View File

@@ -131,51 +131,16 @@
FROM
`${tableName}`
WHERE
datetime >= DATE_SUB(#{now}, INTERVAL 24 HOUR) AND (wallet, coin, miner, algorithm) IN (
<foreach collection="list" item="item" separator=",">
(#{item.walletAddress}, #{item.coin}, #{item.workerId}, #{item.algorithm})
<foreach collection="list" item="item" separator=" OR ">
(wallet = #{item.walletAddress}
AND coin = #{item.coin}
AND miner = #{item.workerId}
AND algorithm = #{item.algorithm}
AND datetime >= #{item.startTime}
AND datetime &lt;= #{item.endTime})
</foreach>
)
</select>
<select id="getRealTimeHashrate" resultType="com.m2pool.lease.dto.v2.RealHashrateInfoDto">
SELECT
hashrate AS power,
pool_name AS pool,
wallet AS walletAddress,
miner AS miner,
algorithm,
coin
FROM
`${tableName}`
WHERE
datetime = #{datetime}
AND (wallet, coin, miner, algorithm) IN (
<foreach collection="list" item="item" separator=",">
(#{item.walletAddress}, #{item.coin}, #{item.miner}, #{item.algorithm})
</foreach>
)
</select>
<select id="getHashrateInRange" resultType="com.m2pool.lease.dto.v2.RealHashrateInfoDto">
SELECT
hashrate AS power,
pool_name AS pool,
wallet AS walletAddress,
miner AS miner,
algorithm,
coin,
datetime
FROM
`${tableName}`
WHERE
datetime >= #{startTime}
AND datetime &lt;= #{endTime}
AND (wallet, coin, miner, algorithm) IN (
<foreach collection="list" item="item" separator=",">
(#{item.walletAddress}, #{item.coin}, #{item.miner}, #{item.algorithm})
</foreach>
)
ORDER BY datetime ASC
</select>
<select id="getMiningInfoByOrderId" resultType="com.m2pool.lease.dto.v2.PurchasedMachineListDto">
select
id ,