From 934be40c8bbaef84e63a1232c08dc896e8f106a5 Mon Sep 17 00:00:00 2001 From: yyb <1416014977@qq.com> Date: Wed, 30 Apr 2025 14:02:32 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E5=BA=9F=E5=BC=83=E5=8E=86=E5=8F=B2?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=9F=A5=E8=AF=A2=E6=8E=A5=E5=8F=A3=EF=BC=8C?= =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89ws=E9=93=BE=E6=8E=A5=E5=92=8Cip?= =?UTF-8?q?=E9=99=90=E5=88=B6=E6=95=B0=E7=9B=91=E5=90=AC=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chat/config/CustomWebSocketConfig.java | 5 +- .../chat/config/WebSocketBrokerConfig.java | 5 +- .../com/m2pool/chat/constant/Destination.java | 7 ++- .../controller/ChatMessageController.java | 4 +- .../chat/coverter/CommonMessageConvert.java | 1 + .../java/com/m2pool/chat/dto/ChatRoomDto.java | 2 +- .../m2pool/chat/dto/WebsocketMessageDto.java | 2 +- .../chat/entity/ChatMessageHistory.java | 2 + .../WebsocketChannelInterceptor.java | 41 ++++--------- .../m2pool/chat/listener/IpLimitEvent.java | 21 +++++++ .../m2pool/chat/listener/IpLimitListener.java | 22 +++++++ .../chat/listener/WebSocketEventListener.java | 41 +++++++++++++ .../chat/mapper/ChatMessageHistoryMapper.java | 6 +- .../service/ChatMessageHistoryService.java | 8 +++ .../m2pool/chat/service/ChatRoomService.java | 2 +- .../impl/ChatMessageHistoryServiceImpl.java | 11 ++++ .../service/impl/ChatMessageServiceImpl.java | 26 ++++++-- .../service/impl/ChatRoomServiceImpl.java | 17 ++++-- .../chat/service/impl/StompServiceImpl.java | 36 ++++++++--- .../java/com/m2pool/chat/task/ChatTask.java | 60 +++++++++++++++++++ .../com/m2pool/chat/utils/StompUtils.java | 30 ++++++++++ .../java/com/m2pool/chat/vo/CharRoomVo.java | 4 +- .../com/m2pool/chat/vo/MessagePageVo.java | 4 +- .../com/m2pool/chat/vo/UserMessageVo.java | 2 +- .../mapper/chat/ChatMessageHistoryMapper.xml | 6 +- .../resources/mapper/chat/ChatRoomMapper.xml | 7 ++- 26 files changed, 304 insertions(+), 68 deletions(-) create mode 100644 m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitEvent.java create mode 100644 m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java create mode 100644 m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java create mode 100644 m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatMessageHistoryService.java create mode 100644 m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageHistoryServiceImpl.java create mode 100644 m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/utils/StompUtils.java diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/CustomWebSocketConfig.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/CustomWebSocketConfig.java index fc4660a..b2062f7 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/CustomWebSocketConfig.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/CustomWebSocketConfig.java @@ -2,15 +2,18 @@ package com.m2pool.chat.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Configuration; @Data @Configuration +@RefreshScope @ConfigurationProperties(prefix = "websocket.transport") public class CustomWebSocketConfig { private int messageSizeLimit; private int sendTimeLimit; private int sendBufferSizeLimit; private int timeToFirstMessage; - private int maxConnections; + private int maxConnections; + private String defaultCustomerEmail; } \ No newline at end of file diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java index 4c88112..bfc2bbd 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java @@ -4,6 +4,7 @@ import com.m2pool.chat.constant.Destination; import com.m2pool.chat.coverter.CommonMessageConvert; import com.m2pool.chat.interceptor.WebsocketChannelInterceptor; import com.m2pool.chat.interceptor.WebsocketHandshakeInterceptor; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.simp.config.ChannelRegistration; @@ -31,6 +32,8 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { @Resource private CustomWebSocketConfig customWebSocketConfig; + @Resource + private ApplicationEventPublisher applicationEventPublisher; /** * 注册 Stomp的端点 可以注册多个端点 * addEndpoint:添加STOMP协议的端点。客户端访问地址 @@ -96,7 +99,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { .maxPoolSize(20) .keepAliveSeconds(60); // 拦截器配置 - registration.interceptors(new WebsocketChannelInterceptor(customWebSocketConfig)); + registration.interceptors(new WebsocketChannelInterceptor(customWebSocketConfig,applicationEventPublisher)); } /** diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java index 6cb7179..8257184 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java @@ -8,10 +8,15 @@ package com.m2pool.chat.constant; */ public class Destination { /** - * stomp 默认单对单 发送目的地 + * stomp 默认单对单 发送目的地.单对单消息默认 / */ public static final String QUEUE = "/queue"; + /** + * 聊天室关闭 路径 + */ + public static final String QUEUE_CLOSE_ROOM = "/close/room/"; + /** * stomp 默认群发 目的地 */ diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatMessageController.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatMessageController.java index e42ae4e..2361895 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatMessageController.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatMessageController.java @@ -26,7 +26,7 @@ public class ChatMessageController { * @return */ @PostMapping("/find/history/message") - @ApiOperation(value = "查询7天前的历史记录", notes = "根据ID查询7天前的历史记录") + @ApiOperation(value = "查询7天前的历史记录(废弃,请使用/find/recently/message)", notes = "根据ID查询7天前的历史记录") @RequiresLogin public R> findHistoryMessage(@RequestBody MessagePageVo pageVo) { return chatMessageService.findHistoryMessage(pageVo.getId(), pageVo.getPageNum(), pageVo.getRoomId()); @@ -40,7 +40,7 @@ public class ChatMessageController { } @GetMapping("/delete/message") - @ApiOperation(value = "消息撤回或删除") + @ApiOperation(value = "消息撤回或删除(废弃)") @RequiresLogin public R deleteMessage( @ApiParam(value = "消息id", required = true, example = "1") diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/coverter/CommonMessageConvert.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/coverter/CommonMessageConvert.java index b323115..637483a 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/coverter/CommonMessageConvert.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/coverter/CommonMessageConvert.java @@ -47,6 +47,7 @@ public class CommonMessageConvert implements MessageConverter { public Message toMessage(Object payload, MessageHeaders headers) { String str = JsonUtil.toJson(payload); byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + System.out.println("发送给前端的消息"+new GenericMessage<>(bytes, headers)); return new GenericMessage<>(bytes, headers); } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java index 391f05e..1d20acb 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java @@ -26,7 +26,7 @@ public class ChatRoomDto { * 聊天室id */ @ApiModelProperty(value = "聊天室id", example = "1") - private Long id; + private String id; /** * 聊天对象id */ diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java index 19ecc1a..8b179dd 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java @@ -52,7 +52,7 @@ public class WebsocketMessageDto { /** * 聊天室id */ - private Long roomId; + private String roomId; /** * 是否创建新聊天室 diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/entity/ChatMessageHistory.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/entity/ChatMessageHistory.java index 2cfc417..b06f06c 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/entity/ChatMessageHistory.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/entity/ChatMessageHistory.java @@ -3,11 +3,13 @@ package com.m2pool.chat.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.time.LocalDateTime; +@Builder @Data @AllArgsConstructor @NoArgsConstructor diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java index feb8086..f868d55 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java @@ -7,6 +7,7 @@ import com.m2pool.chat.entity.StompPrincipal; import lombok.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -17,7 +18,6 @@ import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.MessageHeaderAccessor; -import java.util.ArrayList; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.m2pool.chat.constant.CustomHeader.*; import static com.m2pool.chat.constant.UserType.LOGIN_USER; import static com.m2pool.chat.constant.UserType.TOURIST; +import static com.m2pool.chat.utils.StompUtils.getConnectCustomHeaders; /** @@ -53,8 +54,11 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { */ private CustomWebSocketConfig customWebSocketConfig; - public WebsocketChannelInterceptor(CustomWebSocketConfig customWebSocketConfig) { + private ApplicationEventPublisher applicationEventPublisher; + + public WebsocketChannelInterceptor(CustomWebSocketConfig customWebSocketConfig, ApplicationEventPublisher applicationEventPublisher) { this.customWebSocketConfig = customWebSocketConfig; + this.applicationEventPublisher = applicationEventPublisher; } /** @@ -75,20 +79,14 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); int type = Integer.parseInt(getConnectCustomHeaders(raw, TYPE)); String email = getConnectCustomHeaders(raw, EMAIL); - //最大链接数限制 maxConnectionsLimit(); //根据客服端ip + 用户类型限制连接数 ipLimit(accessor,type,email); - //链接请求头中用户信息存入stomp中 mha.setUser(new StompPrincipal(email,type,true)); } - if (accessor.getCommand() == StompCommand.SEND) { - LOGGER.info("------------websocket send message"); - } - if (accessor.getCommand() == StompCommand.SUBSCRIBE) { LOGGER.info("------------websocket subscribe message"); } @@ -122,7 +120,6 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { Map sessionAttributes = accessor.getSessionAttributes(); if (sessionAttributes != null) { String ipAddr = (String) sessionAttributes.get(IPADDR); - System.out.println(accessor.getSessionId()); String hasConnectionEmail = ipConnectionCountMap.get(ipAddr); //两种ip限制情况 //本次链接为游客 且ip上已经有人链接直接拒绝本次链接 @@ -132,8 +129,11 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { //本次链接为登录用户,并且已经链接.直接拒绝本次链接(多用户登录) if ( type == LOGIN_USER ) { if(email.equals(hasConnectionEmail) || ipConnectionCountMap.containsValue(email)){ +// StompPrincipal principal = new StompPrincipal(email, type, true); +// applicationEventPublisher.publishEvent(new IpLimitEvent(this, principal)); throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); } + } ipConnectionCountMap.put(ipAddr,email); @@ -142,22 +142,7 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { } } - /** - * 获取stomp自定义请求头 - * @param raw - * @param headerKey - * @return - */ - private String getConnectCustomHeaders(Object raw,String headerKey){ - String headerValue = ""; - if (raw instanceof Map) { - Object value = ((Map) raw).get(headerKey); - if(value instanceof ArrayList){ - headerValue = ((ArrayList) value).get(0).toString(); - } - } - return headerValue; - } + /** * 断开链接处理 @@ -177,28 +162,24 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { @Override public void postSend(Message message, MessageChannel channel, boolean sent) { - LOGGER.info("------------WebsocketChannelInterceptor-postSend"); + } @Override public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, @Nullable Exception ex) { - LOGGER.info("-----------WebsocketChannelInterceptor-afterSendCompletion"); } @Override public boolean preReceive(MessageChannel channel) { - LOGGER.info("----------WebsocketChannelInterceptor-preReceive"); return true; } @Override public Message postReceive(Message message, MessageChannel channel) { - LOGGER.info("----------WebsocketChannelInterceptor-postReceive"); return message; } @Override public void afterReceiveCompletion(@Nullable Message message, MessageChannel channel, @Nullable Exception ex) { - LOGGER.info("----------WebsocketChannelInterceptor-afterReceiveCompletion"); } } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitEvent.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitEvent.java new file mode 100644 index 0000000..adbe470 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitEvent.java @@ -0,0 +1,21 @@ +package com.m2pool.chat.listener; + +import com.m2pool.chat.entity.StompPrincipal; +import org.springframework.context.ApplicationEvent; + +/** + * 登录限制事件 + */ +public class IpLimitEvent extends ApplicationEvent { + + private final StompPrincipal principal; + + public IpLimitEvent(Object source, StompPrincipal principal) { + super(source); + this.principal = principal; + } + + public StompPrincipal getPrincipal() { + return principal; + } +} \ No newline at end of file diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java new file mode 100644 index 0000000..3087d3f --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java @@ -0,0 +1,22 @@ +package com.m2pool.chat.listener; + +import com.m2pool.chat.entity.StompPrincipal; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +@Component +public class IpLimitListener implements ApplicationListener { + + @Autowired + private SimpMessagingTemplate messagingTemplate; + + @Override + public void onApplicationEvent(IpLimitEvent event) { + StompPrincipal principal =event.getPrincipal(); +// messagingTemplate.convertAndSendToUser(principal.getName(), +// Destination.QUEUE + "/" + principal.getName() +// , "ip链接数限制"); + } +} \ No newline at end of file diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java new file mode 100644 index 0000000..86bca19 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java @@ -0,0 +1,41 @@ +package com.m2pool.chat.listener; + +import com.m2pool.chat.entity.StompPrincipal; +import com.m2pool.chat.service.impl.StompServiceImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationListener; +import org.springframework.messaging.Message; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.messaging.SessionDisconnectEvent; + +import javax.annotation.Resource; + +import static com.m2pool.chat.constant.UserType.TOURIST; + +/** + * websocket通道关闭监听器 + */ +@Component +public class WebSocketEventListener implements ApplicationListener{ + + private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketEventListener.class); + + @Resource + private StompServiceImpl stompService; + + @Override + public void onApplicationEvent(SessionDisconnectEvent event) { + StompPrincipal user = (StompPrincipal) event.getUser(); + Message message = event.getMessage(); + StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); + if (user != null && accessor.getCommand() == StompCommand.DISCONNECT && TOURIST.equals(user.getType())) { + //游客断开链接,通知客服删除游客聊天室 + stompService.customerCloseRoom(user.getName()); + // 删除数据库中游客数据 + + } + } +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/mapper/ChatMessageHistoryMapper.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/mapper/ChatMessageHistoryMapper.java index b23635d..f1e5abb 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/mapper/ChatMessageHistoryMapper.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/mapper/ChatMessageHistoryMapper.java @@ -1,14 +1,16 @@ package com.m2pool.chat.mapper; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.m2pool.chat.dto.ChatMessageDto; +import com.m2pool.chat.entity.ChatMessageHistory; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import java.util.List; @Mapper -public interface ChatMessageHistoryMapper { +public interface ChatMessageHistoryMapper extends BaseMapper { /** * 根据游标查询十条数据 @@ -17,6 +19,6 @@ public interface ChatMessageHistoryMapper { * @param roomId * @return */ - List findHistoryMessage(@Param("id") Long id,@Param("pageNum") Integer pageNum,@Param("roomId") Long roomId); + List findHistoryMessage(@Param("email") String email,@Param("id") Long id,@Param("pageNum") Integer pageNum,@Param("roomId") Long roomId); } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatMessageHistoryService.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatMessageHistoryService.java new file mode 100644 index 0000000..2813aa2 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatMessageHistoryService.java @@ -0,0 +1,8 @@ +package com.m2pool.chat.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import com.m2pool.chat.entity.ChatMessageHistory; + +public interface ChatMessageHistoryService extends IService { + +} \ No newline at end of file diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatRoomService.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatRoomService.java index b2f6546..791b0d2 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatRoomService.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/ChatRoomService.java @@ -15,7 +15,7 @@ public interface ChatRoomService extends IService { * @param roomPageVo * @return */ - TableDataInfo findRoomList(RoomPageVo roomPageVo); + TableDataInfo findRoomList(RoomPageVo roomPageVo); /** * 根据当前用户邮箱查询聊天室 diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageHistoryServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageHistoryServiceImpl.java new file mode 100644 index 0000000..10d46f5 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageHistoryServiceImpl.java @@ -0,0 +1,11 @@ +package com.m2pool.chat.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.m2pool.chat.entity.ChatMessageHistory; +import com.m2pool.chat.mapper.ChatMessageHistoryMapper; +import com.m2pool.chat.service.ChatMessageHistoryService; +import org.springframework.stereotype.Service; + +@Service +public class ChatMessageHistoryServiceImpl extends ServiceImpl implements ChatMessageHistoryService { +} \ No newline at end of file diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java index 7c97985..519b589 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java @@ -1,6 +1,7 @@ package com.m2pool.chat.service.impl; import com.m2pool.chat.dto.ChatMessageDto; +import com.m2pool.chat.entity.ChatMessage; import com.m2pool.chat.entity.ChatRoom; import com.m2pool.chat.mapper.ChatMessageHistoryMapper; import com.m2pool.chat.mapper.ChatMessageMapper; @@ -31,15 +32,28 @@ public class ChatMessageServiceImpl implements ChatMessageService { private ChatRoomMapper chatRoomMapper; @Override - public R> findHistoryMessage(Long id,Integer pageNum,Long roomId) { - List historyMessage = chatMessageHistoryMapper.findHistoryMessage(id, pageNum, roomId); - return R.success(historyMessage); + public R> + findHistoryMessage(Long id,Integer pageNum,Long roomId) { + String email = SecurityUtils.getUsername(); + List messages; + messages = chatMessageMapper.findRecentlyMessage(email,id, pageNum, roomId); + return R.success(messages); } @Override public R> findRecentlyMessage(Long id,Integer pageNum,Long roomId) { String email = SecurityUtils.getUsername(); - List recentlyMessage = chatMessageMapper.findRecentlyMessage(email,id, pageNum, roomId); + ChatMessage chatMessage = chatMessageMapper.selectById(id); + List recentlyMessage; + if(chatMessage != null){ + recentlyMessage = chatMessageMapper.findRecentlyMessage(email,id, pageNum, roomId); + if(recentlyMessage.size() != pageNum){ + recentlyMessage.addAll(chatMessageHistoryMapper.findHistoryMessage(email, id, pageNum - recentlyMessage.size(), roomId)); + } + }else{ + recentlyMessage = chatMessageHistoryMapper.findHistoryMessage(email, id, pageNum, roomId); + } + return R.success(recentlyMessage); } @@ -62,10 +76,10 @@ public class ChatMessageServiceImpl implements ChatMessageService { .build(); if (chatRoom != null){ if (Objects.equals(type, LOGIN_USER)) { - build.setServiceReadNum(0); + build.setClientReadNum(0); } if(Objects.equals(type, CUSTOMER)){ - build.setClientReadNum(0); + build.setServiceReadNum(0); } int i = chatRoomMapper.updateById(build); return i >= 0 ? R.success("已读") : R.fail("消息读取失败"); diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java index c65459a..9189390 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java @@ -44,22 +44,29 @@ public class ChatRoomServiceImpl extends ServiceImpl i @Override - public TableDataInfo findRoomList(RoomPageVo roomPageVo) { + public TableDataInfo findRoomList(RoomPageVo roomPageVo) { String userEmail = SecurityUtils.getUsername(); PageHelper.startPage(1, 20); //1.查找当前客服对应的聊天室 List roomList = chatRoomMapper.findRoomList(userEmail,roomPageVo.getSendDateTime()); + PageUtils.clearPage(); +// if (roomList.isEmpty()){ +// TableDataInfo tableDataInfo = new TableDataInfo(); +// tableDataInfo.setCode(HttpStatus.ERROR); +// tableDataInfo.setMsg("当前客服暂无聊天室"); +// return tableDataInfo; +// } return getDataTable(roomList); } - private TableDataInfo getDataTable(List list) + private TableDataInfo getDataTable(List list) { - TableDataInfo rspData = new TableDataInfo(); + TableDataInfo rspData = new TableDataInfo(); rspData.setCode(HttpStatus.SUCCESS); rspData.setRows(list); rspData.setMsg("查询成功"); - PageInfo pageInfo = new PageInfo(list); + PageInfo pageInfo = new PageInfo(list); rspData.setTotal(pageInfo.getTotal()); rspData.setTotalPage(pageInfo.getPages()); return rspData; @@ -86,7 +93,7 @@ public class ChatRoomServiceImpl extends ServiceImpl i ChatRoom build = ChatRoom.builder().userOneEmail(userEmail).userTwoEmail(sysUser.getEmail()).build(); int insert = chatRoomMapper.insert(build); if (insert > 0){ - return R.success(ChatRoomDto.builder().id(build.getId()).userEmail(build.getUserTwoEmail()).build()); + return R.success(ChatRoomDto.builder().id(String.valueOf(build.getId())).userEmail(build.getUserTwoEmail()).build()); } return R.fail("聊天室不存在,并且创建聊天室失败"); } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java index 783d8d9..2ab51c4 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java @@ -1,5 +1,6 @@ package com.m2pool.chat.service.impl; +import com.m2pool.chat.config.CustomWebSocketConfig; import com.m2pool.chat.constant.Destination; import com.m2pool.chat.dto.WebsocketMessageDto; import com.m2pool.chat.entity.ChatMessage; @@ -9,6 +10,7 @@ import com.m2pool.chat.mapper.ChatMessageMapper; import com.m2pool.chat.mapper.ChatRoomMapper; import com.m2pool.chat.service.StompService; import com.m2pool.chat.vo.UserMessageVo; +import com.m2pool.common.core.utils.StringUtils; import com.m2pool.common.core.web.Result.AjaxResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; @@ -42,8 +44,15 @@ public class StompServiceImpl implements StompService { @Resource private TransactionTemplate transactionTemplate; + @Resource + private CustomWebSocketConfig webSocketConfig; + @Override public AjaxResult sendMessageToUser(StompPrincipal principal, UserMessageVo userMessageVo) { + if(StringUtils.isEmpty(userMessageVo.getEmail())){ + userMessageVo.setEmail(webSocketConfig.getDefaultCustomerEmail()); + } + userRegistry.getUsers().forEach(user -> System.out.println(user.getName())); Boolean isCreate = principal.getIsCreate(); WebsocketMessageDto build = WebsocketMessageDto.builder() .type(userMessageVo.getType()) @@ -52,33 +61,32 @@ public class StompServiceImpl implements StompService { .sendUserType(principal.getType()) .sendTime(new Date()) .isCreate(isCreate) + .clientReadNum(0) .build(); if (isCreate){ principal.setIsCreate(false); } - boolean bool = checkOnline(userMessageVo); - //获取当前聊天室对象 ChatRoom chatRoom = chatRoomMapper.selectById(userMessageVo.getRoomId()); - if (chatRoom != null ){ + build.setRoomId(String.valueOf(userMessageVo.getRoomId())); if(LOGIN_USER.equals(principal.getType())){ build.setClientReadNum(chatRoom.getServiceReadNum() + 1); }else { build.setClientReadNum(chatRoom.getClientReadNum() + 1); } + }else{ + build.setRoomId(TOURIST.equals(userMessageVo.getType()) ? userMessageVo.getEmail() : principal.getName()); } - - - //当前用户发送其他人, 发送给指定用户. 否则默认发送给当前发送者 + boolean bool = checkOnline(userMessageVo); + //在线用户才发送消息 if (bool){ messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE + "/" + userMessageVo.getEmail(),build); } // 接收者和发送者 都不是游客才能存储消息 和修改聊天室信息。后续可改为消息中间件解耦形式。 if (!TOURIST.equals(userMessageVo.getReceiveUserType()) && !TOURIST.equals(principal.getType()) && chatRoom != null){ - build.setRoomId(userMessageVo.getRoomId()); transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { @@ -118,7 +126,7 @@ public class StompServiceImpl implements StompService { .sendEmail(principal.getName()) .content(userMessageVo.getContent()) .type(userMessageVo.getType()) - .roomId(userMessageVo.getRoomId()) + .roomId(Long.parseLong(userMessageVo.getRoomId())) .build(); chatMessageMapper.insert(message); return message.getId(); @@ -142,4 +150,16 @@ public class StompServiceImpl implements StompService { } chatRoomMapper.updateById(room); } + + /** + * 用于客服关闭断线客服端聊天室接口 + * @param roomId 游客与客服聊天室id, 实际为游客邮箱 + * @return + */ + public void customerCloseRoom(String roomId){ + messagingTemplate.convertAndSendToUser( + webSocketConfig.getDefaultCustomerEmail(), + Destination.QUEUE + Destination.QUEUE_CLOSE_ROOM + webSocketConfig.getDefaultCustomerEmail(),roomId); + } + } \ No newline at end of file diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java index 13f3e91..0df2bd4 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java @@ -1,12 +1,72 @@ package com.m2pool.chat.task; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.m2pool.chat.entity.ChatMessage; +import com.m2pool.chat.entity.ChatMessageHistory; +import com.m2pool.chat.mapper.ChatMessageMapper; +import com.m2pool.chat.service.ChatMessageHistoryService; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.List; +import java.util.stream.Collectors; + /** * @ClassName ChatTask * @Description chat 聊天室和消息定时任务 * @Author yyb * @Date 2025/4/15 11:51 */ +@Configuration +@EnableScheduling public class ChatTask { + @Resource + private ChatMessageMapper chatMessageMapper; + @Resource + private ChatMessageHistoryService chatMessageHistoryService; + + // @Scheduled(cron = "0 0/1 * * * ?") + @Scheduled(cron = "0 15 1 * * ?") + public void chatMessageDataSeparatedForHotAndCold(){ + int pageNum = 1; + int pageSize = 1000; // 每批处理数量 + List chatMessages; + + do { + Page page = new Page<>(pageNum, pageSize); + chatMessageMapper.selectPage(page, new LambdaQueryWrapper() + .le(ChatMessage::getCreateTime, LocalDateTime.now().minusDays(7))); + + chatMessages = page.getRecords(); + + if (!chatMessages.isEmpty()) { + List collect = chatMessages.stream() + .map(chatMessage -> + ChatMessageHistory.builder() + .id(chatMessage.getId()) + .type(chatMessage.getType()) + .sendEmail(chatMessage.getSendEmail()) + .content(chatMessage.getContent()) + .roomId(chatMessage.getRoomId()) + .createTime(chatMessage.getCreateTime()) + .updateTime(chatMessage.getUpdateTime()) + .build() + ) + .collect(Collectors.toList()); + + // 批量插入 + chatMessageHistoryService.saveBatch(collect); + chatMessageMapper.deleteBatchIds(chatMessages.stream().map(ChatMessage::getId).collect(Collectors.toList())); + + } + + pageNum++; + } while (!chatMessages.isEmpty()); + } } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/utils/StompUtils.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/utils/StompUtils.java new file mode 100644 index 0000000..d027fb8 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/utils/StompUtils.java @@ -0,0 +1,30 @@ +package com.m2pool.chat.utils; + +import java.util.ArrayList; +import java.util.Map; + +/** + * @ClassName StompUtils + * @Description stomp工具类 + * @Author yyb + * @Date 2025/4/29 11:35 + */ +public class StompUtils { + /** + * 获取stomp自定义请求头 + * @param raw + * @param headerKey + * @return + */ + public static String getConnectCustomHeaders(Object raw,String headerKey){ + String headerValue = ""; + if (raw instanceof Map) { + Object value = ((Map) raw).get(headerKey); + if(value instanceof ArrayList){ + headerValue = ((ArrayList) value).get(0).toString(); + } + } + return headerValue; + } + +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/CharRoomVo.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/CharRoomVo.java index ef0c345..a703ff7 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/CharRoomVo.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/CharRoomVo.java @@ -13,8 +13,8 @@ import lombok.Data; @Data @ApiModel(description = "聊天室请求对象") public class CharRoomVo { - @ApiModelProperty(value = "聊天室id", example = "1") + @ApiModelProperty(value = "聊天室id", example = "1",required = true) private Long id; - @ApiModelProperty(value = "聊天室重要程度 0不重要 1重要", example = "1") + @ApiModelProperty(value = "聊天室重要程度 0不重要 1重要", example = "1",required = true) private Integer flag; } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/MessagePageVo.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/MessagePageVo.java index 6065c51..5493ce2 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/MessagePageVo.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/MessagePageVo.java @@ -20,10 +20,10 @@ public class MessagePageVo { @ApiModelProperty(value = "页数量", example = "1") private Integer pageNum; - @ApiModelProperty(value = "聊天室id", example = "1") + @ApiModelProperty(value = "聊天室id", example = "1",required = true) private Long roomId; - @ApiModelProperty(value = "用户类型 0 游客 1 登录用户 2 客服", example = "1") + @ApiModelProperty(value = "用户类型 0 游客 1 登录用户 2 客服", example = "1",required = true) private Integer userType; public MessagePageVo() { this.pageNum = 20; diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java index 0ca3201..a144507 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java @@ -42,5 +42,5 @@ public class UserMessageVo { * 聊天室id */ @ApiModelProperty(value = "聊天室id", example = "1") - private Long roomId; + private String roomId; } diff --git a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml index 7dab262..c8c31ac 100644 --- a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml +++ b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml @@ -11,7 +11,11 @@ type, send_email as sendEmail, content, - create_time as createTime + create_time as createTime, + room_id as roomId, + case when send_email = #{email} then 1 + else 0 + end as isSelf FROM chat_message_history room_id = #{roomId} diff --git a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml index 0b4be39..e86c5f0 100644 --- a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml +++ b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml @@ -9,11 +9,11 @@ flag, last_user_send_time as lastUserSendTime, last_customer_send_time as lastCustomerSendTime, - client_read_num AS clientReadNum + service_read_num AS clientReadNum FROM chat_room - user_two_email = #{userEmail} AND del = FALSE + user_two_email = #{userEmail} AND del = false AND last_user_send_time #{sendDateTime} @@ -22,6 +22,7 @@ AND last_user_send_time NOW() + ORDER BY flag DESC, @@ -29,7 +30,7 @@