update 增加重试机制及死信队列
This commit is contained in:
@@ -0,0 +1,73 @@
|
|||||||
|
package com.m2pool.lease.mq;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.m2pool.lease.constant.RabbitmqConstant;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.amqp.core.Message;
|
||||||
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 死信队列监听器 - 处理重试失败的消息
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class DeadLetterQueueListener {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueListener.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 监听重试失败死信队列
|
||||||
|
*/
|
||||||
|
@RabbitListener(queues = RabbitmqConstant.RETRY_DEAD_LETTER_QUEUE)
|
||||||
|
public void handleRetryFailedMessage(Message message) {
|
||||||
|
try {
|
||||||
|
String messageId = message.getMessageProperties().getMessageId();
|
||||||
|
String contentType = message.getMessageProperties().getContentType();
|
||||||
|
|
||||||
|
// 获取重试信息
|
||||||
|
Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count");
|
||||||
|
String reason = (String) message.getMessageProperties().getHeaders().get("x-reason");
|
||||||
|
Long failedTimestamp = (Long) message.getMessageProperties().getHeaders().get("x-failed-timestamp");
|
||||||
|
|
||||||
|
log.error("========================================");
|
||||||
|
log.error("消息重试失败,进入死信队列");
|
||||||
|
log.error("消息ID: {}", messageId);
|
||||||
|
log.error("重试次数: {}", retryCount);
|
||||||
|
log.error("失败原因: {}", reason);
|
||||||
|
log.error("失败时间: {}", failedTimestamp != null ? new java.util.Date(failedTimestamp) : "未知");
|
||||||
|
log.error("消息内容类型: {}", contentType);
|
||||||
|
|
||||||
|
// 打印消息内容(根据实际类型处理)
|
||||||
|
if (message.getBody() != null) {
|
||||||
|
String body = new String(message.getBody());
|
||||||
|
log.error("消息内容: {}", body);
|
||||||
|
|
||||||
|
//TODO 可以根据需要将消息保存到数据库,供后续人工处理
|
||||||
|
// saveFailedMessageToDb(messageId, body, reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.error("========================================");
|
||||||
|
|
||||||
|
//TODO 这里可以添加告警通知,如发送邮件、短信等
|
||||||
|
// sendAlert(messageId, reason);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理死信队列消息时发生异常", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 可选:将失败消息保存到数据库
|
||||||
|
*/
|
||||||
|
// private void saveFailedMessageToDb(String messageId, String messageBody, String reason) {
|
||||||
|
// // 实现保存逻辑
|
||||||
|
// }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 可选:发送告警通知
|
||||||
|
*/
|
||||||
|
// private void sendAlert(String messageId, String reason) {
|
||||||
|
// // 实现告警逻辑
|
||||||
|
// }
|
||||||
|
}
|
||||||
@@ -12,8 +12,6 @@ import com.m2pool.lease.mapper.*;
|
|||||||
import com.m2pool.lease.mq.message.*;
|
import com.m2pool.lease.mq.message.*;
|
||||||
import com.m2pool.lease.service.LeaseOrderItemService;
|
import com.m2pool.lease.service.LeaseOrderItemService;
|
||||||
import com.m2pool.lease.utils.UuidGeneratorUtil;
|
import com.m2pool.lease.utils.UuidGeneratorUtil;
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import org.springframework.amqp.core.Message;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
import org.springframework.messaging.handler.annotation.Payload;
|
import org.springframework.messaging.handler.annotation.Payload;
|
||||||
@@ -27,7 +25,7 @@ import java.util.List;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class MessageReceiver {
|
public class MessageReceiverListener {
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private LeasePayWithdrawMessageMapper leasePayWithdrawMessageMapper;
|
private LeasePayWithdrawMessageMapper leasePayWithdrawMessageMapper;
|
||||||
Reference in New Issue
Block a user