update 增加重试机制及死信队列

This commit is contained in:
yyb
2026-01-26 13:42:23 +08:00
parent 5a649619bc
commit 118083f46d
4 changed files with 130 additions and 64 deletions

View File

@@ -1,12 +1,14 @@
package com.m2pool.lease.config; package com.m2pool.lease.config;
import com.m2pool.lease.constant.RabbitmqConstant;
import org.springframework.amqp.core.*; import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@@ -18,6 +20,8 @@ import static com.m2pool.lease.constant.RabbitmqConstant.*;
@Configuration @Configuration
public class RabbitMQConfig { public class RabbitMQConfig {
@Bean @Bean
public MessageConverter jackson2JsonMessageConverter() { public MessageConverter jackson2JsonMessageConverter() {
//自动生成消息唯一id //自动生成消息唯一id
@@ -54,13 +58,31 @@ public class RabbitMQConfig {
@Bean @Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
RabbitTemplate rabbitTemplate) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory); factory.setConnectionFactory(connectionFactory);
//消费者序列化 //消费者序列化
factory.setMessageConverter(jackson2JsonMessageConverter()); factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setConcurrentConsumers(3); // 设置初始消费者数量 factory.setConcurrentConsumers(3); // 设置初始消费者数量
factory.setMaxConcurrentConsumers(5); // 设置最大消费者数量 factory.setMaxConcurrentConsumers(5); // 设置最大消费者数量
// 配置重试机制 - 使用死信队列恢复器
factory.setAdviceChain(org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(3) // 最大重试次数
.backOffOptions(5000, 2.0, 60000) // 初始间隔5秒倍率2.0最大间隔60秒
.recoverer(new org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer(rabbitTemplate,
RabbitmqConstant.RETRY_DEAD_LETTER_EXCHANGE,
RabbitmqConstant.RETRY_DEAD_LETTER_ROUTING_KEY)) // 重试失败后发送到死信队列
.build());
// 使用自动确认模式,让重试机制正常工作
factory.setAcknowledgeMode(org.springframework.amqp.core.AcknowledgeMode.AUTO);
// 设置预取数量,防止消息积压
factory.setPrefetchCount(1);
return factory; return factory;
} }
@@ -140,6 +162,34 @@ public class RabbitMQConfig {
//----------------定义订单延迟队列------------------------ //----------------定义订单延迟队列------------------------
//----------------定义重试死信队列------------------------
/**
* 重试失败死信交换机
*/
@Bean
public DirectExchange retryDeadLetterExchange() {
return new DirectExchange(RabbitmqConstant.RETRY_DEAD_LETTER_EXCHANGE);
}
/**
* 重试失败死信队列
*/
@Bean
public Queue retryDeadLetterQueue() {
return new Queue(RabbitmqConstant.RETRY_DEAD_LETTER_QUEUE, true);
}
/**
* 重试失败死信队列绑定
*/
@Bean
public Binding retryDeadLetterBinding() {
return BindingBuilder.bind(retryDeadLetterQueue())
.to(retryDeadLetterExchange())
.with(RabbitmqConstant.RETRY_DEAD_LETTER_ROUTING_KEY);
}
//----------------定义支付相关队列------------------------ //----------------定义支付相关队列------------------------
/** /**
* 声明 Topic 类型的交换机 * 声明 Topic 类型的交换机

View File

@@ -56,6 +56,21 @@ public class RabbitmqConstant {
*/ */
public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key"; public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
/**
* 重试失败死信交换机
*/
public static final String RETRY_DEAD_LETTER_EXCHANGE = "retry.dead.letter.exchange";
/**
* 重试失败死信队列
*/
public static final String RETRY_DEAD_LETTER_QUEUE = "retry.dead.letter.queue";
/**
* 重试失败死信路由键
*/
public static final String RETRY_DEAD_LETTER_ROUTING_KEY = "retry.dead.letter.routing.key";
//----------------定义支付相关队列------------------------ //----------------定义支付相关队列------------------------

View File

@@ -12,6 +12,8 @@ import com.m2pool.lease.mapper.*;
import com.m2pool.lease.mq.message.*; import com.m2pool.lease.mq.message.*;
import com.m2pool.lease.service.LeaseOrderItemService; import com.m2pool.lease.service.LeaseOrderItemService;
import com.m2pool.lease.utils.UuidGeneratorUtil; import com.m2pool.lease.utils.UuidGeneratorUtil;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.handler.annotation.Payload;
@@ -322,7 +324,7 @@ public class MessageReceiver {
} }
/** /**
* 余额提现 消费者 * 余额提现 消费者
* @param payWithdrawReturnMessage * @param payWithdrawReturnMessage
*/ */
@@ -372,7 +374,6 @@ public class MessageReceiver {
leasePayWithdrawMessageMapper.updateById(build); leasePayWithdrawMessageMapper.updateById(build);
} }
} }
} }
/** /**
@@ -555,59 +556,59 @@ public class MessageReceiver {
//} //}
// //
//测试 开发环境 充值测试 //测试 开发环境 充值测试
//@RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_QUEUE,containerFactory ="rabbitListenerContainerFactory") @RabbitListener(queues = RabbitmqConstant.PAY_RECHARGE_QUEUE,containerFactory ="rabbitListenerContainerFactory")
//public void listenerPayRechargeQueueMessage(@Payload RabbitmqPayRechargeMessage payAutoReturnMessage) { public void listenerPayRechargeQueueMessage(@Payload RabbitmqPayRechargeMessage payAutoReturnMessage) {
// String s = UuidGeneratorUtil.generateUuidWithoutHyphen(); String s = UuidGeneratorUtil.generateUuidWithoutHyphen();
// ////发送充值消息 ////发送充值消息
// //RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayRechargeReturnMessage.builder() //RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayRechargeReturnMessage.builder()
// // .queue_id(payAutoReturnMessage.getQueue_id()) // .queue_id(payAutoReturnMessage.getQueue_id())
// // .status(2) // .status(2)
// // .amount(BigDecimal.valueOf(20)) // .amount(BigDecimal.valueOf(20))
// // .chain(payAutoReturnMessage.getChain()) // .chain(payAutoReturnMessage.getChain())
// // .symbol(payAutoReturnMessage.getSymbol()) // .symbol(payAutoReturnMessage.getSymbol())
// // .address(payAutoReturnMessage.getAddress()) // .address(payAutoReturnMessage.getAddress())
// // .tx_hash(s) // .tx_hash(s)
// // .build(); // .build();
// //rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); //rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage);
//
//
// //发送充值消息
// RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage1 = RabbitmqPayRechargeReturnMessage.builder()
// .queue_id(payAutoReturnMessage.getQueue_id())
// .status(1)
// .amount(BigDecimal.valueOf(100))
// .chain(payAutoReturnMessage.getChain())
// .symbol(payAutoReturnMessage.getSymbol())
// .address(payAutoReturnMessage.getAddress())
// .user_email(payAutoReturnMessage.getUser_email())
// .fromAddress(payAutoReturnMessage.getAddress())
// .tx_hash(s)
// .build();
// rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage1);
//}
////提现 //发送充值消息
//@RabbitListener(queues = RabbitmqConstant.PAY_WITHDRAW_QUEUE,containerFactory ="rabbitListenerContainerFactory") RabbitmqPayRechargeReturnMessage rabbitmqPayRechargeReturnMessage1 = RabbitmqPayRechargeReturnMessage.builder()
//public void listenerWithdrawQueueMessage(@Payload RabbitmqPayWithdrawMessage payAutoReturnMessage) throws InterruptedException { .queue_id(payAutoReturnMessage.getQueue_id())
// Thread.sleep(10000); .status(1)
// RabbitmqPayWithdrawReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayWithdrawReturnMessage.builder() .amount(BigDecimal.valueOf(100))
// .queue_id(payAutoReturnMessage.getQueue_id()) .chain(payAutoReturnMessage.getChain())
// .status(1) .symbol(payAutoReturnMessage.getSymbol())
// .amount(payAutoReturnMessage.getAmount()) .address(payAutoReturnMessage.getAddress())
// .chain(payAutoReturnMessage.getChain()) .user_email(payAutoReturnMessage.getUser_email())
// .symbol(payAutoReturnMessage.getSymbol()) .fromAddress(payAutoReturnMessage.getAddress())
// .from_address(payAutoReturnMessage.getFrom_address()) .tx_hash(s)
// .tx_hash(UuidGeneratorUtil.generateUuidWithoutHyphen()) .build();
// .fee(payAutoReturnMessage.getFee()) rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_RECHARGE_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage1);
// .user_email(payAutoReturnMessage.getUser_email()) }
// .build();
// //提现100 提现失败
// if (payAutoReturnMessage.getAmount().equals(BigDecimal.valueOf(100))){ //提现
// rabbitmqPayRechargeReturnMessage.setStatus(0); @RabbitListener(queues = RabbitmqConstant.PAY_WITHDRAW_QUEUE,containerFactory ="rabbitListenerContainerFactory")
// } public void listenerWithdrawQueueMessage(@Payload RabbitmqPayWithdrawMessage payAutoReturnMessage) throws InterruptedException {
// rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_WITHDRAW_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage); Thread.sleep(10000);
//} RabbitmqPayWithdrawReturnMessage rabbitmqPayRechargeReturnMessage = RabbitmqPayWithdrawReturnMessage.builder()
.queue_id(payAutoReturnMessage.getQueue_id())
.status(1)
.amount(payAutoReturnMessage.getAmount())
.chain(payAutoReturnMessage.getChain())
.symbol(payAutoReturnMessage.getSymbol())
.from_address(payAutoReturnMessage.getFrom_address())
.tx_hash(UuidGeneratorUtil.generateUuidWithoutHyphen())
.fee(payAutoReturnMessage.getFee())
.user_email(payAutoReturnMessage.getUser_email())
.build();
//提现100 提现失败
if (payAutoReturnMessage.getAmount().equals(BigDecimal.valueOf(100))){
rabbitmqPayRechargeReturnMessage.setStatus(0);
}
rabbitTemplate.convertAndSend(RabbitmqConstant.PAY_WITHDRAW_RETURN_QUEUE,rabbitmqPayRechargeReturnMessage);
}
////测试 开发环境 删除钱包测试 ////测试 开发环境 删除钱包测试

View File

@@ -1087,16 +1087,16 @@ public class LeaseUserServiceImpl extends ServiceImpl<LeaseUserMapper, LeaseUser
}else{ }else{
for (UserWalletDataDto userWalletDataDto : userWalletDataDtoList) { for (UserWalletDataDto userWalletDataDto : userWalletDataDtoList) {
if (userWalletDataDto.getFromSymbol().equals(chainAndCoinVo.getCoin())){ if (userWalletDataDto.getFromSymbol().equals(chainAndCoinVo.getCoin())){
////开发环境 //开发环境
//sendMessage(UserWalletDataDto.builder() sendMessage(UserWalletDataDto.builder()
// .queueId(userWalletDataDto.getQueueId()) .queueId(userWalletDataDto.getQueueId())
// .fromAddress(userWalletDataDto.getFromAddress()) .fromAddress(userWalletDataDto.getFromAddress())
// .fromChain(chainAndCoinVo.getChain()) .fromChain(chainAndCoinVo.getChain())
// .fromSymbol(chainAndCoinVo.getCoin()) .fromSymbol(chainAndCoinVo.getCoin())
// .balance(BigDecimal.ZERO) .balance(BigDecimal.ZERO)
// .userId(userWalletDataDto.getUserId()) .userId(userWalletDataDto.getUserId())
// .qrcode(userWalletDataDto.getQrcode()) .qrcode(userWalletDataDto.getQrcode())
// .build(),username,authId); .build(),username,authId);
return Result.success(userWalletDataDto); return Result.success(userWalletDataDto);
} }
} }