diff --git a/src/main/java/com/m2pool/lease/config/RabbitMQConfig.java b/src/main/java/com/m2pool/lease/config/RabbitMQConfig.java index 80c1812..f03d9d7 100644 --- a/src/main/java/com/m2pool/lease/config/RabbitMQConfig.java +++ b/src/main/java/com/m2pool/lease/config/RabbitMQConfig.java @@ -1,12 +1,14 @@ package com.m2pool.lease.config; +import com.m2pool.lease.constant.RabbitmqConstant; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,6 +20,8 @@ import static com.m2pool.lease.constant.RabbitmqConstant.*; @Configuration public class RabbitMQConfig { + + @Bean public MessageConverter jackson2JsonMessageConverter() { //自动生成消息唯一id @@ -54,13 +58,31 @@ public class RabbitMQConfig { @Bean - public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { + public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, + RabbitTemplate rabbitTemplate) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //消费者序列化 factory.setMessageConverter(jackson2JsonMessageConverter()); factory.setConcurrentConsumers(3); // 设置初始消费者数量 factory.setMaxConcurrentConsumers(5); // 设置最大消费者数量 + + // 配置重试机制 - 使用死信队列恢复器 + factory.setAdviceChain(org.springframework.amqp.rabbit.config.RetryInterceptorBuilder + .stateless() + .maxAttempts(3) // 最大重试次数 + .backOffOptions(5000, 2.0, 60000) // 初始间隔5秒,倍率2.0,最大间隔60秒 + .recoverer(new org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer(rabbitTemplate, + RabbitmqConstant.RETRY_DEAD_LETTER_EXCHANGE, + RabbitmqConstant.RETRY_DEAD_LETTER_ROUTING_KEY)) // 重试失败后发送到死信队列 + .build()); + + // 使用自动确认模式,让重试机制正常工作 + factory.setAcknowledgeMode(org.springframework.amqp.core.AcknowledgeMode.AUTO); + + // 设置预取数量,防止消息积压 + factory.setPrefetchCount(1); + return factory; } @@ -140,6 +162,34 @@ public class RabbitMQConfig { //----------------定义订单延迟队列------------------------ + //----------------定义重试死信队列------------------------ + + /** + * 重试失败死信交换机 + */ + @Bean + public DirectExchange retryDeadLetterExchange() { + return new DirectExchange(RabbitmqConstant.RETRY_DEAD_LETTER_EXCHANGE); + } + + /** + * 重试失败死信队列 + */ + @Bean + public Queue retryDeadLetterQueue() { + return new Queue(RabbitmqConstant.RETRY_DEAD_LETTER_QUEUE, true); + } + + /** + * 重试失败死信队列绑定 + */ + @Bean + public Binding retryDeadLetterBinding() { + return BindingBuilder.bind(retryDeadLetterQueue()) + .to(retryDeadLetterExchange()) + .with(RabbitmqConstant.RETRY_DEAD_LETTER_ROUTING_KEY); + } + //----------------定义支付相关队列------------------------ /** * 声明 Topic 类型的交换机 diff --git a/src/main/java/com/m2pool/lease/constant/RabbitmqConstant.java b/src/main/java/com/m2pool/lease/constant/RabbitmqConstant.java index 2992f62..d3d6ead 100644 --- a/src/main/java/com/m2pool/lease/constant/RabbitmqConstant.java +++ b/src/main/java/com/m2pool/lease/constant/RabbitmqConstant.java @@ -56,6 +56,21 @@ public class RabbitmqConstant { */ public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key"; + /** + * 重试失败死信交换机 + */ + public static final String RETRY_DEAD_LETTER_EXCHANGE = "retry.dead.letter.exchange"; + + /** + * 重试失败死信队列 + */ + public static final String RETRY_DEAD_LETTER_QUEUE = "retry.dead.letter.queue"; + + /** + * 重试失败死信路由键 + */ + public static final String RETRY_DEAD_LETTER_ROUTING_KEY = "retry.dead.letter.routing.key"; + //----------------定义支付相关队列------------------------ diff --git a/src/main/java/com/m2pool/lease/mq/MessageReceiver.java b/src/main/java/com/m2pool/lease/mq/MessageReceiver.java index 5438462..8e94834 100644 --- a/src/main/java/com/m2pool/lease/mq/MessageReceiver.java +++ b/src/main/java/com/m2pool/lease/mq/MessageReceiver.java @@ -12,6 +12,8 @@ import com.m2pool.lease.mapper.*; import com.m2pool.lease.mq.message.*; import com.m2pool.lease.service.LeaseOrderItemService; import com.m2pool.lease.utils.UuidGeneratorUtil; +import com.rabbitmq.client.Channel; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.messaging.handler.annotation.Payload; @@ -322,7 +324,7 @@ public class MessageReceiver { } - /** +/** * 余额提现 消费者 * @param payWithdrawReturnMessage */ @@ -372,7 +374,6 @@ public class MessageReceiver { leasePayWithdrawMessageMapper.updateById(build); } } - } /** @@ -555,59 +556,59 @@ public class MessageReceiver { //} // //测试 开发环境 充值测试 - //@RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_QUEUE,containerFactory ="rabbitListenerContainerFactory") - //public void listenerPayRechargeQueueMessage(@Payload RabbitmqPayRechargeMessage payAutoReturnMessage) { - // String s = UuidGeneratorUtil.generateUuidWithoutHyphen(); - // ////发送充值消息 - // //RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayRechargeReturnMessage.builder() - // // .queue_id(payAutoReturnMessage.getQueue_id()) - // // .status(2) - // // .amount(BigDecimal.valueOf(20)) - // // .chain(payAutoReturnMessage.getChain()) - // // .symbol(payAutoReturnMessage.getSymbol()) - // // .address(payAutoReturnMessage.getAddress()) - // // .tx_hash(s) - // // .build(); - // //rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); - // - // - // //发送充值消息 - // RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage1 = RabbitmqPayRechargeReturnMessage.builder() - // .queue_id(payAutoReturnMessage.getQueue_id()) - // .status(1) - // .amount(BigDecimal.valueOf(100)) - // .chain(payAutoReturnMessage.getChain()) - // .symbol(payAutoReturnMessage.getSymbol()) - // .address(payAutoReturnMessage.getAddress()) - // .user_email(payAutoReturnMessage.getUser_email()) - // .fromAddress(payAutoReturnMessage.getAddress()) - // .tx_hash(s) - // .build(); - // rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage1); - //} + @RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_QUEUE,containerFactory ="rabbitListenerContainerFactory") + public void listenerPayRechargeQueueMessage(@Payload RabbitmqPayRechargeMessage payAutoReturnMessage) { + String s = UuidGeneratorUtil.generateUuidWithoutHyphen(); + ////发送充值消息 + //RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayRechargeReturnMessage.builder() + // .queue_id(payAutoReturnMessage.getQueue_id()) + // .status(2) + // .amount(BigDecimal.valueOf(20)) + // .chain(payAutoReturnMessage.getChain()) + // .symbol(payAutoReturnMessage.getSymbol()) + // .address(payAutoReturnMessage.getAddress()) + // .tx_hash(s) + // .build(); + //rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); - ////提现 - //@RabbitListener(queues = RabbitmqConstant.PAY_WITHDRAW_QUEUE,containerFactory ="rabbitListenerContainerFactory") - //public void listenerWithdrawQueueMessage(@Payload RabbitmqPayWithdrawMessage payAutoReturnMessage) throws InterruptedException { - // Thread.sleep(10000); - // RabbitmqPayWithdrawReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayWithdrawReturnMessage.builder() - // .queue_id(payAutoReturnMessage.getQueue_id()) - // .status(1) - // .amount(payAutoReturnMessage.getAmount()) - // .chain(payAutoReturnMessage.getChain()) - // .symbol(payAutoReturnMessage.getSymbol()) - // .from_address(payAutoReturnMessage.getFrom_address()) - // .tx_hash(UuidGeneratorUtil.generateUuidWithoutHyphen()) - // .fee(payAutoReturnMessage.getFee()) - // .user_email(payAutoReturnMessage.getUser_email()) - // .build(); - // //提现100 提现失败 - // if (payAutoReturnMessage.getAmount().equals(BigDecimal.valueOf(100))){ - // rabbitmqPayRechargeReturnMessage.setStatus(0); - // } - // rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_WITHDRAW_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); - //} + //发送充值消息 + RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage1 = RabbitmqPayRechargeReturnMessage.builder() + .queue_id(payAutoReturnMessage.getQueue_id()) + .status(1) + .amount(BigDecimal.valueOf(100)) + .chain(payAutoReturnMessage.getChain()) + .symbol(payAutoReturnMessage.getSymbol()) + .address(payAutoReturnMessage.getAddress()) + .user_email(payAutoReturnMessage.getUser_email()) + .fromAddress(payAutoReturnMessage.getAddress()) + .tx_hash(s) + .build(); + rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage1); + } + + + //提现 + @RabbitListener(queues = RabbitmqConstant.PAY_WITHDRAW_QUEUE,containerFactory ="rabbitListenerContainerFactory") + public void listenerWithdrawQueueMessage(@Payload RabbitmqPayWithdrawMessage payAutoReturnMessage) throws InterruptedException { + Thread.sleep(10000); + RabbitmqPayWithdrawReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayWithdrawReturnMessage.builder() + .queue_id(payAutoReturnMessage.getQueue_id()) + .status(1) + .amount(payAutoReturnMessage.getAmount()) + .chain(payAutoReturnMessage.getChain()) + .symbol(payAutoReturnMessage.getSymbol()) + .from_address(payAutoReturnMessage.getFrom_address()) + .tx_hash(UuidGeneratorUtil.generateUuidWithoutHyphen()) + .fee(payAutoReturnMessage.getFee()) + .user_email(payAutoReturnMessage.getUser_email()) + .build(); + //提现100 提现失败 + if (payAutoReturnMessage.getAmount().equals(BigDecimal.valueOf(100))){ + rabbitmqPayRechargeReturnMessage.setStatus(0); + } + rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_WITHDRAW_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); + } ////测试 开发环境 删除钱包测试 diff --git a/src/main/java/com/m2pool/lease/service/impl/LeaseUserServiceImpl.java b/src/main/java/com/m2pool/lease/service/impl/LeaseUserServiceImpl.java index 0f2e3c6..b5ccc97 100644 --- a/src/main/java/com/m2pool/lease/service/impl/LeaseUserServiceImpl.java +++ b/src/main/java/com/m2pool/lease/service/impl/LeaseUserServiceImpl.java @@ -1087,16 +1087,16 @@ public class LeaseUserServiceImpl extends ServiceImpl