update 新增数据补全,删除接口

This commit is contained in:
yyb 2025-09-12 15:30:50 +08:00
parent f0a2309b42
commit 6bd204dc4b
14 changed files with 192 additions and 25 deletions

View File

@ -358,7 +358,7 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils
public static Date getOneMonthAgo(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.MONTH, -1);
calendar.add(Calendar.DAY_OF_MONTH, -30);
return calendar.getTime();
}

View File

@ -5,6 +5,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;

View File

@ -43,4 +43,12 @@ public interface ManageWalletOutInMapper extends BaseMapper<ManageWalletOutIn> {
@DistributionDB
List<SummaryOfPendingPaymentsDto> summaryOfPendingPayments();
/**
* 获取最大日期
* @param coin
* @return
*/
LocalDateTime getMaxDateData();
}

View File

@ -9,6 +9,7 @@ import com.m2pool.manage.entity.ManageWalletOutIn;
import com.m2pool.manage.vo.ManageBaseVo;
import com.m2pool.manage.vo.ManageBroadcastVo;
import com.m2pool.manage.vo.PageVo;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;

View File

@ -252,7 +252,7 @@ public class ManageUserServiceImpl implements ManageUserService {
//查询起付额
List<SummaryOfPendingPaymentsDto> startPayments = manageBroadcastMapper.getStartPayments(summaryOfPendingPaymentsList);
// 创建一个 Map 用于存储第二个集合中元素的 user coin 组合对应的 startPayAmount
// 创建一个 Map 用于存储第二个集合中元素的 user coin 组合对应的 startPayAmount . 这里可以优化一下防止多key报错不过一般不出这个问题
Map<String, BigDecimal> startPayAmountMap = startPayments.stream()
.collect(Collectors.toMap(
dto -> dto.getUser() + "_" + dto.getCoin(),

View File

@ -42,20 +42,18 @@ public class ManageTask {
/**
* 存储交易记录定时任务
*/
@Scheduled(cron = "22 35 0/1 * * ?")
//@Scheduled(cron = "0 0/1 * * * ?")
//@Scheduled(cron = "22 58 0/1 * * ?")
@Scheduled(cron = "0 0/1 * * * ?")
public void insertDataToWalletOutInDb(){
ManageWalletOutIn manageWalletOutIn = manageWalletOutInMapper.selectOne(new LambdaQueryWrapper<ManageWalletOutIn>()
.orderByDesc(ManageWalletOutIn::getDate
).last("limit 1"));
LocalDateTime maxDateData = manageWalletOutInMapper.getMaxDateData();
LocalDateTime startDate = null;
List<ManageWalletOutIn> manageWalletOutIns = new ArrayList<>();
if (manageWalletOutIn != null){
startDate = manageWalletOutIn.getDate();
if (maxDateData != null){
startDate = LocalDateTime.now().toLocalDate().atStartOfDay();
//获取startDate 后的数据,用于比对
manageWalletOutIns = manageWalletOutInMapper.selectList(new LambdaQueryWrapper<ManageWalletOutIn>()
.ge(ManageWalletOutIn::getDate, startDate.toLocalDate().atStartOfDay()));
.ge(ManageWalletOutIn::getDate, startDate));
}
List<ManageWalletOutIn> walletIn = manageWalletOutInMapper.getWalletIn(startDate);
@ -99,7 +97,12 @@ public class ManageTask {
walletInfo.addAll(updatedInList);
} else {
walletInfo.addAll(outList);
List<ManageWalletOutIn> collect = outList.stream().peek(outItem -> {
if (outItem.getShouldOutDate() == null) {
outItem.setShouldOutDate(outItem.getDate().toLocalDate().atStartOfDay());
}
}).collect(Collectors.toList());
walletInfo.addAll(collect);
}
}
//外层为in内层为out -----新增当天只有walletIn,没有walletOut的数据

View File

@ -36,7 +36,7 @@
`user`,
address,
`date`,
DATE(`date`) as shouldOutDate,
COALESCE(DATE(`should_out_date`),DATE(`date`)) as shouldOutDate,
max_height,
allocation_amount as allocationAmount,
transfer_amount as transferAmount
@ -53,7 +53,7 @@
</otherwise>
</choose>
</where>
order by `date` desc
order by `shouldOutDate` desc
</select>
<select id="getHistoryBalance" resultType="com.m2pool.manage.dto.HistoryBalanceDto">
SELECT
@ -167,9 +167,10 @@
user_account_balance uab
JOIN user_miner_account uma ON uma.id = uab.ma_id
WHERE
uab.status = 0 AND (
<foreach collection="list" item="item" separator="OR">
(`user` = #{item.user} AND coin = #{item.coin})
</foreach>
(`miner_user` = #{item.user} AND coin = #{item.coin})
</foreach>)
</select>
</mapper>

View File

@ -9,19 +9,19 @@
SELECT
wi.coin,
wi.`user`,
wi.should_out_date AS `date`,
COALESCE(wi.`should_out_date`, wi.`create_date`) as `date`,
wi.amount AS allocationAmount,
wi.`create_date` as `shouldOutDate`,
wi.should_out_date AS `shouldOutDate`,
wi.max_height AS maxHeight
FROM
wallet_in wi
<where>
<choose>
<when test="startDate != null">
wi.`should_out_date` >= DATE(#{startDate})
wi.`create_date` >= DATE(#{startDate})
</when>
<otherwise>
wi.`should_out_date` <![CDATA[ <= ]]> NOW()
wi.`create_date` <![CDATA[ <= ]]> NOW()
</otherwise>
</choose>
</where>
@ -47,4 +47,7 @@
<select id="summaryOfPendingPayments" resultType="com.m2pool.manage.dto.SummaryOfPendingPaymentsDto">
select coin, max(max_height) as maxHeight, max(should_out_date) AS shouldOutDate, `user`, sum(amount) as needPayAmount from wallet_in where state = 2 group by coin,`user`
</select>
<select id="getMaxDateData" resultType="java.time.LocalDateTime">
select `date` from manage_wallet_out_in order by `date` desc limit 1
</select>
</mapper>

View File

@ -205,7 +205,23 @@ public class PoolController extends BaseController {
return poolService.test1();
}
@PostMapping("/dataSupplement")
public String dataSupplement(@RequestBody DataSupplementVo vo){
if (vo.getKey().equals("dsadasdwfddasdfsafadsfs")){
return poolService.dataSupplement(vo);
}else{
return "失败:key传递错误";
}
}
@PostMapping("/dataDelete")
public String dataDelete(@RequestBody DataSupplementVo vo){
if (vo.getKey().equals("dsadasdwfddasdfsafadsfs")){
return poolService.dataDelete(vo);
}else{
return "失败:key传递错误";
}
}
}

View File

@ -270,4 +270,13 @@ public interface PoolMapper {
* @return
*/
BlockInfo getMoneroBlockInfo();
int deleteMhs30m(@Param("coin") String coin,@Param("date") String date);
int deletePool30m(@Param("coin") String coin,@Param("date") String date);
int deleteUsers30m(@Param("coin") String coin,@Param("date") String date);
}

View File

@ -46,4 +46,18 @@ public interface PoolService {
* @return
*/
AjaxResult getMinerPowerFor30min( MinerMhsVo minerMhsVo);
/**
* 数据补全
* @param vo
*/
String dataSupplement( DataSupplementVo vo);
/**
* 数据删除
* @param vo
* @return
*/
String dataDelete(DataSupplementVo vo);
}

View File

@ -12,17 +12,16 @@ import com.m2pool.common.datasource.annotation.UserDB;
import com.m2pool.common.redis.service.RedisService;
import com.m2pool.pool.dto.*;
import com.m2pool.pool.entity.BlockInfo;
import com.m2pool.pool.entity.PoolPower;
import com.m2pool.pool.enums.*;
import com.m2pool.pool.mapper.PoolMapper;
import com.m2pool.pool.mapper.UserAccountMapper;
import com.m2pool.pool.service.PoolService;
import com.m2pool.pool.task.DataTask;
import com.m2pool.pool.utils.EnumUtils;
import com.m2pool.pool.utils.NodeRpc;
import com.m2pool.pool.utils.PowerUnitUtils;
import com.m2pool.pool.vo.BalanceListGetVo;
import com.m2pool.pool.vo.CoinVo;
import com.m2pool.pool.vo.MinerAccountAddVo;
import com.m2pool.pool.vo.MinerMhsVo;
import com.m2pool.pool.vo.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -33,6 +32,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
@ -719,4 +719,111 @@ public class PoolServiceImpl implements PoolService {
return AjaxResult.success(minerPowerFor30min);
}
@Override
@DSTransactional
public String dataSupplement(DataSupplementVo vo) {
String nowStr = vo.getDate();
//检查二级表是否有当前时间数据
//int count = poolMapper.getLastDataTime(vo.getCoin() + "_mhs30m", nowStr);
List<MinerDataDto> list = poolMapper.getHourMinerDataList(vo.getCoin()+"_mhsv2",nowStr);
if(list.size() > 0){
List<MinerDataDto> offlineList = new ArrayList<>();
list.stream().forEach(e -> {
if(StringUtils.isNotNull(e.getMhs())){
e.setMhs(e.getMhs() * Pools.NEXA.getFac());
}
if("offline".equals(e.getState())){
offlineList.add(e);
}
});
//list = DataTask.filterList(list,now);
boolean result = poolMapper.batchInsertMhsDataToDB(vo.getCoin() + "_mhs30m", DataTask.getToHourDBListByMinerDataList(list));
Map<Date, List<MinerDataDto>> map = list.stream().collect(Collectors.groupingBy(MinerDataDto::getDate));
map.forEach((date,dList) -> {
PoolPower poolPower = new PoolPower();
//poolPower.setDate(date);
poolPower.setDateStr(DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS,date));
Map<String, Long> stateCount = dList.stream().collect(Collectors.groupingBy(MinerDataDto::getState, Collectors.counting()));
poolPower.setMhs(dList.stream().map(e -> BigDecimal.valueOf(e.getMhs())).reduce(BigDecimal::add).get().stripTrailingZeros());
poolPower.setMiners(dList.size());
poolPower.setOnline(Convert.toInt(stateCount.get("online"),0));
poolPower.setOffline(Convert.toInt(stateCount.get("offline"),0));
boolean poolResult = poolMapper.insertPoolPower(vo.getCoin() + "_pool_30m", poolPower);
int time =0;
while (!poolResult){
poolResult = poolMapper.insertPoolPower(vo.getCoin() + "_pool_30m", poolPower);
if (time > 5) {
break;
}
time ++;
}
System.out.println("构造写入user算力表数据");
Map<String, List<MinerDataDto>> userMap = dList.stream().collect(Collectors.groupingBy(MinerDataDto::getUser));
List<MinerDataInsertDBDto> userList = new ArrayList<>();
userMap.forEach((user,uList)->{
MinerDataInsertDBDto uDto = new MinerDataInsertDBDto();
uDto.setDate(DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS,date));
uDto.setUser(user);
uDto.setMhs(uList.stream().map(e->BigDecimal.valueOf(e.getMhs())).reduce(BigDecimal::add).get().doubleValue());
userList.add(uDto);
} );
System.out.println("user算力表入库数据条数"+userList.size());
boolean userResult = poolMapper.batchInsertUserMhsDateToDB(vo.getCoin() + "_users_30m", userList);
int uTime =0;
while (!userResult){
userResult = poolMapper.batchInsertUserMhsDateToDB(vo.getCoin() + "_users_30m", userList);
if (uTime > 5) {
break;
}
uTime ++;
}
});
Map<String, List<MinerDataDto>> userMap = list.stream().collect(Collectors.groupingBy(MinerDataDto::getMiner));
if (result) {
System.out.println(DateUtils.dateTimeNow() + "nexa小时数据存入数据成功");
} else {
System.out.println(DateUtils.dateTimeNow() + "nexa小时数据存入数据失败");
}
if(offlineList.size() >0){
CompletableFuture.runAsync(()->{
//根据挖矿账户分组统计离线矿机名单离线矿机数
//对比redis中该挖矿账户上一次离线矿机数
//有redis数据 需要判断 上一次离线矿机数<本次矿机离线数 才执行通知 否则跳过
//无redis数据 说明是第一次 执行通知
});
}
return "成功";
}
return "hashrate库"+vo.getCoin()+"_mhsv2表未查到数据,补充数据失败";
}
@Override
public String dataDelete(DataSupplementVo vo) {
int i = poolMapper.deleteUsers30m(vo.getCoin(), vo.getDate());
int i1 = poolMapper.deletePool30m(vo.getCoin(), vo.getDate());
int i2 = poolMapper.deleteMhs30m(vo.getCoin(), vo.getDate());
return "删除"+vo.getCoin()+"_mhs30m表"+i+"条数据,删除"+vo.getCoin()+"_pool_30m表"+i1+"条数据,删除"+vo.getCoin()+"_users_30m表"+i2+"条数据";
}
}

View File

@ -23,6 +23,8 @@ import org.springframework.scheduling.annotation.Scheduled;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

View File

@ -4,6 +4,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.m2pool.pool.mapper.PoolMapper">
<select id="getPoolInfoByCoin" resultType="com.m2pool.pool.dto.PoolInfoDto">
select
count(*) poolMc
@ -985,8 +986,9 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
select height,difficulty,power,reward,fees,profit from monero_block_info order by id desc limit 1;
</select>
<delete id="deleteMhs30m">delete from ${coin}_mhs30m where `date` = #{date} </delete>
<delete id="deletePool30m">delete from ${coin}_pool_30m where `date` = #{date}</delete>
<delete id="deleteUsers30m">delete from ${coin}_users_30m where `date` = #{date}</delete>
</mapper>