From 13b5cbcf6d1feb681c3f42ba0f7a005158b5cec2 Mon Sep 17 00:00:00 2001 From: yyb <1416014977@qq.com> Date: Mon, 26 Jan 2026 13:43:13 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E5=A2=9E=E5=8A=A0=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=E5=8F=8A=E6=AD=BB=E4=BF=A1=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lease/mq/DeadLetterQueueListener.java | 73 +++++++++++++++++++ ...iver.java => MessageReceiverListener.java} | 4 +- 2 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/m2pool/lease/mq/DeadLetterQueueListener.java rename src/main/java/com/m2pool/lease/mq/{MessageReceiver.java => MessageReceiverListener.java} (99%) diff --git a/src/main/java/com/m2pool/lease/mq/DeadLetterQueueListener.java b/src/main/java/com/m2pool/lease/mq/DeadLetterQueueListener.java new file mode 100644 index 0000000..cf4ca5a --- /dev/null +++ b/src/main/java/com/m2pool/lease/mq/DeadLetterQueueListener.java @@ -0,0 +1,73 @@ +package com.m2pool.lease.mq; + +import com.alibaba.fastjson.JSON; +import com.m2pool.lease.constant.RabbitmqConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +/** + * 死信队列监听器 - 处理重试失败的消息 + */ +@Component +public class DeadLetterQueueListener { + + private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueListener.class); + + /** + * 监听重试失败死信队列 + */ + @RabbitListener(queues = RabbitmqConstant.RETRY_DEAD_LETTER_QUEUE) + public void handleRetryFailedMessage(Message message) { + try { + String messageId = message.getMessageProperties().getMessageId(); + String contentType = message.getMessageProperties().getContentType(); + + // 获取重试信息 + Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count"); + String reason = (String) message.getMessageProperties().getHeaders().get("x-reason"); + Long failedTimestamp = (Long) message.getMessageProperties().getHeaders().get("x-failed-timestamp"); + + log.error("========================================"); + log.error("消息重试失败,进入死信队列"); + log.error("消息ID: {}", messageId); + log.error("重试次数: {}", retryCount); + log.error("失败原因: {}", reason); + log.error("失败时间: {}", failedTimestamp != null ? new java.util.Date(failedTimestamp) : "未知"); + log.error("消息内容类型: {}", contentType); + + // 打印消息内容(根据实际类型处理) + if (message.getBody() != null) { + String body = new String(message.getBody()); + log.error("消息内容: {}", body); + + //TODO 可以根据需要将消息保存到数据库,供后续人工处理 + // saveFailedMessageToDb(messageId, body, reason); + } + + log.error("========================================"); + + //TODO 这里可以添加告警通知,如发送邮件、短信等 + // sendAlert(messageId, reason); + + } catch (Exception e) { + log.error("处理死信队列消息时发生异常", e); + } + } + + /** + * 可选:将失败消息保存到数据库 + */ + // private void saveFailedMessageToDb(String messageId, String messageBody, String reason) { + // // 实现保存逻辑 + // } + + /** + * 可选:发送告警通知 + */ + // private void sendAlert(String messageId, String reason) { + // // 实现告警逻辑 + // } +} diff --git a/src/main/java/com/m2pool/lease/mq/MessageReceiver.java b/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java similarity index 99% rename from src/main/java/com/m2pool/lease/mq/MessageReceiver.java rename to src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java index 8e94834..30f449e 100644 --- a/src/main/java/com/m2pool/lease/mq/MessageReceiver.java +++ b/src/main/java/com/m2pool/lease/mq/MessageReceiverListener.java @@ -12,8 +12,6 @@ import com.m2pool.lease.mapper.*; import com.m2pool.lease.mq.message.*; import com.m2pool.lease.service.LeaseOrderItemService; import com.m2pool.lease.utils.UuidGeneratorUtil; -import com.rabbitmq.client.Channel; -import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.messaging.handler.annotation.Payload; @@ -27,7 +25,7 @@ import java.util.List; import java.util.stream.Collectors; @Service -public class MessageReceiver { +public class MessageReceiverListener { @Resource private LeasePayWithdrawMessageMapper leasePayWithdrawMessageMapper;