diff --git a/src/main/java/com/m2pool/lease/mq/DeadLetterQueueListener.java b/src/main/java/com/m2pool/lease/mq/DeadLetterQueueListener.java new file mode 100644 index 0000000..cf4ca5a --- /dev/null +++ b/src/main/java/com/m2pool/lease/mq/DeadLetterQueueListener.java @@ -0,0 +1,73 @@ +package com.m2pool.lease.mq; + +import com.alibaba.fastjson.JSON; +import com.m2pool.lease.constant.RabbitmqConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * 死信队列监听器 - 处理重试失败的消息 + */ +@Component +public class DeadLetterQueueListener { + + private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueListener.class); + + /** + * 监听重试失败死信队列 + */ + @RabbitListener(queues = RabbitmqConstant.RETRY_DEAD_LETTER_QUEUE) + public void handleRetryFailedMessage(Message message) { + try { + String messageId = message.getMessageProperties().getMessageId(); + String contentType = message.getMessageProperties().getContentType(); + + // 获取重试信息 + Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count"); + String reason = (String) message.getMessageProperties().getHeaders().get("x-reason"); + Long failedTimestamp = (Long) message.getMessageProperties().getHeaders().get("x-failed-timestamp"); + + log.error("========================================"); + log.error("消息重试失败,进入死信队列"); + log.error("消息ID: {}", messageId); + log.error("重试次数: {}", retryCount); + log.error("失败原因: {}", reason); + log.error("失败时间: {}", failedTimestamp != null ? new java.util.Date(failedTimestamp) : "未知"); + log.error("消息内容类型: {}", contentType); + + // 打印消息内容(根据实际类型处理) + if (message.getBody() != null) { + String body = new String(message.getBody()); + log.error("消息内容: {}", body); + + //TODO 可以根据需要将消息保存到数据库,供后续人工处理 + // saveFailedMessageToDb(messageId, body, reason); + } + + log.error("========================================"); + + //TODO 这里可以添加告警通知,如发送邮件、短信等 + // sendAlert(messageId, reason); + + } catch (Exception e) { + log.error("处理死信队列消息时发生异常", e); + } + } + + /** + * 可选:将失败消息保存到数据库 + */ + // private void saveFailedMessageToDb(String messageId, String messageBody, String reason) { + // // 实现保存逻辑 + // } + + /** + * 可选:发送告警通知 + */ + // private void sendAlert(String messageId, String reason) { + // // 实现告警逻辑 + // } +} diff --git a/src/main/java/com/m2pool/lease/mq/MessageReceiver.java b/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java similarity index 99% rename from src/main/java/com/m2pool/lease/mq/MessageReceiver.java rename to src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java index 8e94834..30f449e 100644 --- a/src/main/java/com/m2pool/lease/mq/MessageReceiver.java +++ b/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java @@ -12,8 +12,6 @@ import com.m2pool.lease.mapper.*; import com.m2pool.lease.mq.message.*; import com.m2pool.lease.service.LeaseOrderItemService; import com.m2pool.lease.utils.UuidGeneratorUtil; -import com.rabbitmq.client.Channel; -import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.messaging.handler.annotation.Payload; @@ -27,7 +25,7 @@ import java.util.List; import java.util.stream.Collectors; @Service -public class MessageReceiver { +public class MessageReceiverListener { @Resource private LeasePayWithdrawMessageMapper leasePayWithdrawMessageMapper;