From 35c4bd99997a7666b7ae04b0b68fc61538bfdc01 Mon Sep 17 00:00:00 2001 From: yyb <1416014977@qq.com> Date: Mon, 26 Jan 2026 16:42:04 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=8F=90=E7=8E=B0=E5=92=8C=E5=85=85?= =?UTF-8?q?=E5=80=BC=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lease/mq/MessageReceiverListener.java | 158 +++++++++--------- .../RabbitmqPayWithdrawReturnMessage.java | 3 +- .../lease/service/WalletCallbackService.java | 48 ++++++ .../service/impl/LeaseShopServiceImpl.java | 1 + .../service/impl/LeaseUserServiceImpl.java | 63 ++++--- 5 files changed, 172 insertions(+), 101 deletions(-) create mode 100644 src/main/java/com/m2pool/lease/service/WalletCallbackService.java diff --git a/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java b/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java index 30f449e..ed8c02a 100644 --- a/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java +++ b/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java @@ -11,7 +11,7 @@ import com.m2pool.lease.exception.PayRechargeException; import com.m2pool.lease.mapper.*; import com.m2pool.lease.mq.message.*; import com.m2pool.lease.service.LeaseOrderItemService; -import com.m2pool.lease.utils.UuidGeneratorUtil; +import com.m2pool.lease.service.WalletCallbackService; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.messaging.handler.annotation.Payload; @@ -237,33 +237,37 @@ public class MessageReceiverListener { //} - - /** + /** * 余额充值 消费者 (正常流程一笔充值会有两个消息 2 充值中 1充值成功) + * * @param payRechargeReturnMessage */ @LedgerLog(value = "余额充值") - @RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,containerFactory ="rabbitListenerContainerFactory") + @RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE, containerFactory = "rabbitListenerContainerFactory") @Transactional(rollbackFor = Exception.class) public void listenerPayRechargeStatusQueueMessage(@Payload RabbitmqPayRechargeReturnMessage payRechargeReturnMessage) { - System.out.println("充值消费者---"+JSONUtil.toJsonPrettyStr(payRechargeReturnMessage)); - //获取到需要操作的钱包 + System.out.println("充值消费者---" + JSONUtil.toJsonPrettyStr(payRechargeReturnMessage)); + if(payRechargeReturnMessage.getStatus() == 4){ + // 触发钱包监听回调,表示监听成功 + WalletCallbackService.triggerCallback(payRechargeReturnMessage.getQueue_id(), true); + } + // 获取到需要操作的钱包 List leaseUserWalletDataList = leaseUserWalletDataMapper.selectList(new LambdaQueryWrapper() .eq(LeaseUserWalletData::getFromAddress, payRechargeReturnMessage.getAddress()) .eq(LeaseUserWalletData::getDel, false) ); - if (leaseUserWalletDataList.isEmpty()){ + if (leaseUserWalletDataList.isEmpty()) { return; } LeaseUserWalletData leaseUserWalletData = null; for (LeaseUserWalletData item : leaseUserWalletDataList) { - if (item.getFromSymbol().equals(payRechargeReturnMessage.getSymbol())){ + if (item.getFromSymbol().equals(payRechargeReturnMessage.getSymbol())) { leaseUserWalletData = item; break; } } - //需要新增该币种的钱包 - if (leaseUserWalletData == null){ + // 需要新增该币种的钱包 + if (leaseUserWalletData == null) { LeaseUserWalletData leaseUserWalletData1 = leaseUserWalletDataList.get(0); leaseUserWalletData = LeaseUserWalletData.builder() .queueId(payRechargeReturnMessage.getQueue_id()) @@ -280,12 +284,11 @@ public class MessageReceiverListener { LeasePayRechargeMessage leasePayRechargeMessage = leasePayRechargeMessageMapper.selectOne(new LambdaQueryWrapper() .eq(LeasePayRechargeMessage::getTxHash, payRechargeReturnMessage.getTx_hash())); BigDecimal balance = leaseUserWalletData.getBalance(); - //处理支付中消息 - if(payRechargeReturnMessage.getStatus() == 2 || leasePayRechargeMessage == null){ - if (leasePayRechargeMessage != null){ + // 处理支付中消息 + if (payRechargeReturnMessage.getStatus() == 2 || leasePayRechargeMessage == null || leasePayRechargeMessage.getStatus() == 3) { + if (leasePayRechargeMessage != null) { return; } - LeasePayRechargeMessage build = LeasePayRechargeMessage.builder() .queueId(payRechargeReturnMessage.getQueue_id()) .chain(payRechargeReturnMessage.getChain()) @@ -298,24 +301,24 @@ public class MessageReceiverListener { .status(payRechargeReturnMessage.getStatus()) .build(); leasePayRechargeMessageMapper.saveOrUpdateByIndex(build); - }else { - //已存在已完成充值的消息 - if (leasePayRechargeMessage.getStatus() == 1){ + } else { + // 已存在已完成充值的消息 + if (leasePayRechargeMessage.getStatus() == 1) { return; } leasePayRechargeMessage.setStatus(payRechargeReturnMessage.getStatus()); leasePayRechargeMessage.setUpdateTime(LocalDateTime.now()); leasePayRechargeMessageMapper.updateById(leasePayRechargeMessage); } - //处理支付成功消息 - if (payRechargeReturnMessage.getStatus() == 1){ + // 处理支付成功消息 + if (payRechargeReturnMessage.getStatus() == 1) { leaseUserWalletData.setBalance(balance.add(payRechargeReturnMessage.getAmount())); - //使用乐观锁防止余额少加 + // 使用乐观锁防止余额少加 int update = leaseUserWalletDataMapper.update(leaseUserWalletData, new LambdaQueryWrapper() .eq(LeaseUserWalletData::getId, leaseUserWalletData.getId()) .eq(LeaseUserWalletData::getBalance, balance) ); - if (update < 1){ + if (update < 1) { throw new PayRechargeException("余额充值失败,余额已被修改"); } } @@ -484,7 +487,7 @@ public class MessageReceiverListener { if (status == 1){ leaseUserWalletData.setBalance(balance); leaseUserWalletData.setBlockedBalance(blockBalance); - }else if (status == 0 ){ + }else if (status == 0 || status == 3){ //余额提现失败 冻结金额 需要减去提现手续费和提现金额 leaseUserWalletData.setBlockedBalance(blockBalance); } @@ -554,59 +557,64 @@ public class MessageReceiverListener { //} // //测试 开发环境 充值测试 - @RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_QUEUE,containerFactory ="rabbitListenerContainerFactory") - public void listenerPayRechargeQueueMessage(@Payload RabbitmqPayRechargeMessage payAutoReturnMessage) { - String s = UuidGeneratorUtil.generateUuidWithoutHyphen(); - ////发送充值消息 - //RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayRechargeReturnMessage.builder() - // .queue_id(payAutoReturnMessage.getQueue_id()) - // .status(2) - // .amount(BigDecimal.valueOf(20)) - // .chain(payAutoReturnMessage.getChain()) - // .symbol(payAutoReturnMessage.getSymbol()) - // .address(payAutoReturnMessage.getAddress()) - // .tx_hash(s) - // .build(); - //rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); - - - //发送充值消息 - RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage1 = RabbitmqPayRechargeReturnMessage.builder() - .queue_id(payAutoReturnMessage.getQueue_id()) - .status(1) - .amount(BigDecimal.valueOf(100)) - .chain(payAutoReturnMessage.getChain()) - .symbol(payAutoReturnMessage.getSymbol()) - .address(payAutoReturnMessage.getAddress()) - .user_email(payAutoReturnMessage.getUser_email()) - .fromAddress(payAutoReturnMessage.getAddress()) - .tx_hash(s) - .build(); - rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage1); - } - - - //提现 - @RabbitListener(queues = RabbitmqConstant.PAY_WITHDRAW_QUEUE,containerFactory ="rabbitListenerContainerFactory") - public void listenerWithdrawQueueMessage(@Payload RabbitmqPayWithdrawMessage payAutoReturnMessage) throws InterruptedException { - Thread.sleep(10000); - RabbitmqPayWithdrawReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayWithdrawReturnMessage.builder() - .queue_id(payAutoReturnMessage.getQueue_id()) - .status(1) - .amount(payAutoReturnMessage.getAmount()) - .chain(payAutoReturnMessage.getChain()) - .symbol(payAutoReturnMessage.getSymbol()) - .from_address(payAutoReturnMessage.getFrom_address()) - .tx_hash(UuidGeneratorUtil.generateUuidWithoutHyphen()) - .fee(payAutoReturnMessage.getFee()) - .user_email(payAutoReturnMessage.getUser_email()) - .build(); - //提现100 提现失败 - if (payAutoReturnMessage.getAmount().equals(BigDecimal.valueOf(100))){ - rabbitmqPayRechargeReturnMessage.setStatus(0); - } - rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_WITHDRAW_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); - } + //@RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_QUEUE,containerFactory ="rabbitListenerContainerFactory") + //public void listenerPayRechargeQueueMessage(@Payload RabbitmqPayRechargeMessage payAutoReturnMessage, + // com.rabbitmq.client.Channel channel, + // org.springframework.amqp.core.Message message) { + // try { + // System.out.println("收到钱包监听请求: " + JSONUtil.toJsonPrettyStr(payAutoReturnMessage)); + // + // // 这里可以添加实际的监听逻辑 + // // 例如:调用外部 API 启动钱包监听 + // + // // 模拟监听成功 + // String response = "钱包监听启动成功: " + payAutoReturnMessage.getAddress(); + // + // // 发送响应给调用者 + // channel.basicPublish( + // "", + // message.getMessageProperties().getReplyTo(), + // null, + // response.getBytes() + // ); + // + // // 手动确认消息 + // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + // + // System.out.println("钱包监听响应已发送: " + response); + // } catch (Exception e) { + // System.out.println("处理钱包监听请求失败: " + e.getMessage()); + // try { + // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); + // } catch (Exception ex) { + // ex.printStackTrace(); + // } + // } + //} + //} + // + // + ////提现 + //@RabbitListener(queues = RabbitmqConstant.PAY_WITHDRAW_QUEUE,containerFactory ="rabbitListenerContainerFactory") + //public void listenerWithdrawQueueMessage(@Payload RabbitmqPayWithdrawMessage payAutoReturnMessage) throws InterruptedException { + // Thread.sleep(10000); + // RabbitmqPayWithdrawReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayWithdrawReturnMessage.builder() + // .queue_id(payAutoReturnMessage.getQueue_id()) + // .status(1) + // .amount(payAutoReturnMessage.getAmount()) + // .chain(payAutoReturnMessage.getChain()) + // .symbol(payAutoReturnMessage.getSymbol()) + // .from_address(payAutoReturnMessage.getFrom_address()) + // .tx_hash(UuidGeneratorUtil.generateUuidWithoutHyphen()) + // .fee(payAutoReturnMessage.getFee()) + // .user_email(payAutoReturnMessage.getUser_email()) + // .build(); + // //提现100 提现失败 + // if (payAutoReturnMessage.getAmount().equals(BigDecimal.valueOf(100))){ + // rabbitmqPayRechargeReturnMessage.setStatus(0); + // } + // rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_WITHDRAW_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); + //} ////测试 开发环境 删除钱包测试 diff --git a/src/main/java/com/m2pool/lease/mq/message/RabbitmqPayWithdrawReturnMessage.java b/src/main/java/com/m2pool/lease/mq/message/RabbitmqPayWithdrawReturnMessage.java index 33496ff..f7bf2e7 100644 --- a/src/main/java/com/m2pool/lease/mq/message/RabbitmqPayWithdrawReturnMessage.java +++ b/src/main/java/com/m2pool/lease/mq/message/RabbitmqPayWithdrawReturnMessage.java @@ -46,9 +46,8 @@ public class RabbitmqPayWithdrawReturnMessage { /** - * 提现金额 + * 提现金额 这个金额是减去手续费的 */ - @LedgerLogParam(value = "提现金额") private BigDecimal amount; /** diff --git a/src/main/java/com/m2pool/lease/service/WalletCallbackService.java b/src/main/java/com/m2pool/lease/service/WalletCallbackService.java new file mode 100644 index 0000000..f1aac63 --- /dev/null +++ b/src/main/java/com/m2pool/lease/service/WalletCallbackService.java @@ -0,0 +1,48 @@ +package com.m2pool.lease.service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 钱包监听回调服务 + * 用于存储和管理钱包监听的回调结果 + */ +public class WalletCallbackService { + + // 用于存储钱包监听回调结果,key: queueId, value: 回调结果 + private static final Map walletListenCallbackMap = new ConcurrentHashMap<>(); + + /** + * 注册回调(清除之前的回调结果) + * @param queueId 队列ID + */ + public static void registerCallback(String queueId) { + walletListenCallbackMap.remove(queueId); + } + + /** + * 触发回调 + * @param queueId 队列ID + * @param success 是否成功 + */ + public static void triggerCallback(String queueId, boolean success) { + walletListenCallbackMap.put(queueId, success); + } + + /** + * 获取回调结果 + * @param queueId 队列ID + * @return 回调结果,如果不存在则返回 null + */ + public static Boolean getCallbackResult(String queueId) { + return walletListenCallbackMap.get(queueId); + } + + /** + * 移除回调 + * @param queueId 队列ID + */ + public static void removeCallback(String queueId) { + walletListenCallbackMap.remove(queueId); + } +} \ No newline at end of file diff --git a/src/main/java/com/m2pool/lease/service/impl/LeaseShopServiceImpl.java b/src/main/java/com/m2pool/lease/service/impl/LeaseShopServiceImpl.java index f0aac75..435180b 100644 --- a/src/main/java/com/m2pool/lease/service/impl/LeaseShopServiceImpl.java +++ b/src/main/java/com/m2pool/lease/service/impl/LeaseShopServiceImpl.java @@ -727,6 +727,7 @@ public class LeaseShopServiceImpl extends ServiceImpl bindWallet(ChainAndCoinVo chainAndCoinVo) { Long authId = SecurityUtils.getUserId(); String username = SecurityUtils.getUsername(); //同一用户同一个链只会存在一个钱包 - List userWalletDataDtoList = leaseUserWalletDataMapper.getWalletInfoByChain(chainAndCoinVo.getChain(),authId); + List userWalletDataDtoList = leaseUserWalletDataMapper.getWalletInfoByChain(chainAndCoinVo.getChain(), authId); UserWalletDataDto result; //如果某个链 未查找到钱包地址 去新绑定一个 - if (userWalletDataDtoList.isEmpty()){ + if (userWalletDataDtoList.isEmpty()) { LeaseAutoAddress bindAddress = getBindAddress(chainAndCoinVo.getChain(), chainAndCoinVo.getCoin()); - if (bindAddress != null){ + if (bindAddress != null) { String address = bindAddress.getAddress(); String qrcode = QrCodeUtils.creatRrCode(bindAddress.getAddress(), 200, 200); String queueId = UuidGeneratorUtil.generateUuidWithoutHyphen(); @@ -1078,26 +1081,38 @@ public class LeaseUserServiceImpl extends ServiceImpl