diff --git a/m2pool-gateway/src/main/resources/bootstrap-test.yml b/m2pool-gateway/src/main/resources/bootstrap-test.yml index f6b083b..7573ec0 100644 --- a/m2pool-gateway/src/main/resources/bootstrap-test.yml +++ b/m2pool-gateway/src/main/resources/bootstrap-test.yml @@ -1,3 +1,4 @@ + server: port: 8101 # Spring diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java index 9d512c2..a8b4886 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java @@ -63,7 +63,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { - config.enableSimpleBroker(Destination.TOPIC, Destination.QUEUE) + config.enableSimpleBroker(Destination.TOPIC, Destination.QUEUE_USER,Destination.QUEUE_CUSTOMER) .setHeartbeatValue(new long[] {10000, 10000}) .setTaskScheduler(new DefaultManagedTaskScheduler()); @@ -82,12 +82,23 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { registration .setMessageSizeLimit(customWebSocketConfig.getMessageSizeLimit()) .setSendTimeLimit(customWebSocketConfig.getSendTimeLimit()) - .setSendBufferSizeLimit(customWebSocketConfig.getSendBufferSizeLimit()) + .setSendBufferSizeLimit(customWebSocketConfig.getSendBufferSizeLimit()); // 首次连接超时时间.正常情况下,前端订阅 和 心跳包的影响 不会超时断连 - .setTimeToFirstMessage(customWebSocketConfig.getTimeToFirstMessage()); +// .setTimeToFirstMessage(customWebSocketConfig.getTimeToFirstMessage()); } +// @Bean +// public ServletServerContainerFactoryBean createServletServerContainerFactoryBean() { +// ServletServerContainerFactoryBean factoryBean = new ServletServerContainerFactoryBean(); +// factoryBean.setMaxTextMessageBufferSize(customWebSocketConfig.getMessageSizeLimit()); +// factoryBean.setMaxBinaryMessageBufferSize(customWebSocketConfig.getMessageSizeLimit()); +// factoryBean.setMaxSessionIdleTimeout(2048L * 2048L); +// factoryBean.setAsyncSendTimeout(2048L * 2048L); +// return factoryBean; +// } + + /** * 配置客户端出站通道拦截器(默认线程1) diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java index 8257184..b7e438e 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/Destination.java @@ -7,11 +7,21 @@ package com.m2pool.chat.constant; * @Date 2025/4/14 14:54 */ public class Destination { + /** * stomp 默认单对单 发送目的地.单对单消息默认 / */ public static final String QUEUE = "/queue"; + /** + * stomp 默认单对单 发送目的地.单对单消息默认 / + */ + public static final String QUEUE_USER = "/queue/user"; + + /** + * stomp 默认单对单 发送目的地.单对单消息默认 / + */ + public static final String QUEUE_CUSTOMER = "/queue/customer"; /** * 聊天室关闭 路径 */ @@ -23,9 +33,9 @@ public class Destination { public static final String TOPIC = "/topic"; /** - * 前端订阅消息所需前缀。 stomp 默认user前缀。 + * 前端订阅消息所需前缀point。 stomp 默认user前缀。 */ - public static final String USER_PREFIX = "/user"; + public static final String USER_PREFIX = "/sub"; /** * stomp 默认发送消息前缀。 diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/StompController.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/StompController.java index 66df71f..77a8c67 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/StompController.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/StompController.java @@ -27,13 +27,25 @@ public class StompController extends BaseController { private StompService stompService; /** - * 发送消息到对应的用户 + * 发送消息给普通用户和游客 * @param userMessageVo 消息体 * @return 返回值通过CommonMessageConvert消息转换器转换 */ - @MessageMapping("/send/message") - @SendToUser("/queue") + @MessageMapping("/send/message/to/user") + @SendToUser("/queue/user") public AjaxResult sendMessageToUser(StompPrincipal principal, @Payload UserMessageVo userMessageVo) { return stompService.sendMessageToUser(principal,userMessageVo); } + + + /** + * 发送消息到对应的客服 + * @param userMessageVo 消息体 + * @return 返回值通过CommonMessageConvert消息转换器转换 + */ + @MessageMapping("/send/message/to/customer") + @SendToUser("/queue/customer/") + public AjaxResult sendMessageToCustomer(StompPrincipal principal, @Payload UserMessageVo userMessageVo) { + return stompService.sendMessageToCustomer(principal,userMessageVo); + } } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/StompService.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/StompService.java index 82b16e2..d58e4e5 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/StompService.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/StompService.java @@ -13,4 +13,11 @@ public interface StompService { */ AjaxResult sendMessageToUser(StompPrincipal principal, UserMessageVo userMessageVo); + + /** + * 发送消息给客服 + * @param userMessageVo + * @return + */ + AjaxResult sendMessageToCustomer(StompPrincipal principal, UserMessageVo userMessageVo); } \ No newline at end of file diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java index 5575e47..c4a62c0 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatMessageServiceImpl.java @@ -9,6 +9,7 @@ import com.m2pool.chat.mapper.ChatMessageMapper; import com.m2pool.chat.mapper.ChatRoomMapper; import com.m2pool.chat.service.ChatMessageService; import com.m2pool.common.core.Result.R; +import com.m2pool.common.core.utils.StringUtils; import com.m2pool.common.security.utils.SecurityUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -17,8 +18,7 @@ import javax.annotation.Resource; import java.util.List; import java.util.Objects; -import static com.m2pool.chat.constant.UserType.CUSTOMER; -import static com.m2pool.chat.constant.UserType.LOGIN_USER; +import static com.m2pool.chat.constant.UserType.*; @Service public class ChatMessageServiceImpl implements ChatMessageService { @@ -44,6 +44,9 @@ public class ChatMessageServiceImpl implements ChatMessageService { @Override public R> findRecentlyMessage(String email,Long id,Integer pageNum,Long roomId) { ChatMessage chatMessage; + if(StringUtils.isEmpty(email)){ + return R.fail("查询失败,用户标识或邮箱不能为空"); + } if(id != null && id != 0){ chatMessage = chatMessageMapper.selectById(id); }else{ @@ -81,7 +84,7 @@ public class ChatMessageServiceImpl implements ChatMessageService { .id(roomId) .build(); if (chatRoom != null){ - if (Objects.equals(type, LOGIN_USER)) { + if (Objects.equals(type, LOGIN_USER) || Objects.equals(type, TOURIST) ) { build.setClientReadNum(0); } if(Objects.equals(type, CUSTOMER)){ diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java index 09dce3e..c16a8b0 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java @@ -49,6 +49,44 @@ public class StompServiceImpl implements StompService { @Override public AjaxResult sendMessageToUser(StompPrincipal principal, UserMessageVo userMessageVo) { + WebsocketMessageDto build = buildDto(principal, userMessageVo); + //获取当前聊天室对象 + ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper() + .eq(ChatRoom::getUserOneEmail, principal.getName()) + .eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail())); + build.setRoomId(String.valueOf(userMessageVo.getRoomId())); + int serviceReadNum = chatRoom != null ? chatRoom.getClientReadNum() : 0; + build.setClientReadNum( serviceReadNum+ 1); + //不在判断接收者是否在线 +// boolean bool = checkOnline(userMessageVo); + //在线用户才发送消息 + messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_USER + "/" + userMessageVo.getEmail(),build); + executeTran(principal, userMessageVo, chatRoom); + return AjaxResult.success("成功"); + } + + @Override + public AjaxResult sendMessageToCustomer(StompPrincipal principal, UserMessageVo userMessageVo) { + WebsocketMessageDto build = buildDto(principal, userMessageVo); + + ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper() + .eq(ChatRoom::getUserOneEmail, userMessageVo.getEmail()) + .eq(ChatRoom::getUserTwoEmail, principal.getName())); + build.setRoomId(String.valueOf(userMessageVo.getRoomId())); + int serviceReadNum = chatRoom != null ? chatRoom.getServiceReadNum() : 0; + build.setClientReadNum(serviceReadNum + 1); + messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_CUSTOMER + "/" + userMessageVo.getEmail(),build); + executeTran(principal, userMessageVo, chatRoom); + return AjaxResult.success("成功"); + } + + /** + * 构建聊天实时返回信息 + * @param principal + * @param userMessageVo + * @return + */ + private WebsocketMessageDto buildDto(StompPrincipal principal, UserMessageVo userMessageVo) { Boolean isCreate = principal.getIsCreate(); WebsocketMessageDto build = WebsocketMessageDto.builder() .type(userMessageVo.getType()) @@ -62,28 +100,16 @@ public class StompServiceImpl implements StompService { if (isCreate){ principal.setIsCreate(false); } - //获取当前聊天室对象 - ChatRoom chatRoom; + return build; + } - if(!CUSTOMER.equals(principal.getType())){ - chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper() - .eq(ChatRoom::getUserOneEmail, principal.getName()) - .eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail())); - build.setRoomId(String.valueOf(userMessageVo.getRoomId())); - build.setClientReadNum(chatRoom.getServiceReadNum() + 1); - }else { - chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper() - .eq(ChatRoom::getUserOneEmail, userMessageVo.getEmail()) - .eq(ChatRoom::getUserTwoEmail, principal.getName())); - build.setRoomId(String.valueOf(userMessageVo.getRoomId())); - build.setClientReadNum(chatRoom.getClientReadNum() + 1); - } - - boolean bool = checkOnline(userMessageVo); - //在线用户才发送消息 - if (bool){ - messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE + "/" + userMessageVo.getEmail(),build); - } + /** + * 执行消息表新增消息和聊天室已读未读消息数量更新事务 + * @param principal + * @param userMessageVo + * @param chatRoom + */ + private void executeTran(StompPrincipal principal, UserMessageVo userMessageVo, ChatRoom chatRoom){ // 消息存储 transactionTemplate.execute(new TransactionCallbackWithoutResult() { @Override @@ -92,7 +118,7 @@ public class StompServiceImpl implements StompService { // 插入消息并获取 ID insertMessage(principal,userMessageVo); //聊天室,已读未读数量,更新。 - updateRoom(chatRoom,principal.getType(),bool); + updateRoom(chatRoom,principal.getType()); } catch (Exception e) { // 回滚事务 status.setRollbackOnly(); @@ -100,8 +126,6 @@ public class StompServiceImpl implements StompService { } } }); - - return AjaxResult.success("成功"); } /** @@ -109,6 +133,7 @@ public class StompServiceImpl implements StompService { * @return */ private boolean checkOnline(UserMessageVo userMessageVo){ + return userRegistry.getUsers().stream() .anyMatch(user -> user.getName().equals(userMessageVo.getEmail())); } @@ -133,19 +158,21 @@ public class StompServiceImpl implements StompService { * 修改聊天信息 * @param chatRoom 本次聊天聊天室信息 * @param sendUserType 发送者类型 - * @param bool 发送者是否在线,不在线,需要更新未读消息数量 + * @param {bool 废弃 发送者是否在线,不在线,需要更新未读消息数量} */ - private void updateRoom(ChatRoom chatRoom,Integer sendUserType,Boolean bool){ - - ChatRoom room = ChatRoom.builder().id(chatRoom.getId()).build(); - if (CUSTOMER.equals(sendUserType)) { - room.setClientReadNum(chatRoom.getClientReadNum() + 1); - room.setLastCustomerSendTime(LocalDateTime.now()); - } else { - room.setServiceReadNum(chatRoom.getServiceReadNum() + 1); - room.setLastUserSendTime(LocalDateTime.now()); + private void updateRoom(ChatRoom chatRoom,Integer sendUserType){ + if(chatRoom != null){ + ChatRoom room = ChatRoom.builder().id(chatRoom.getId()).build(); + if (CUSTOMER.equals(sendUserType)) { + room.setClientReadNum(chatRoom.getClientReadNum() + 1); + room.setLastCustomerSendTime(LocalDateTime.now()); + } else { + room.setServiceReadNum(chatRoom.getServiceReadNum() + 1); + room.setLastUserSendTime(LocalDateTime.now()); + } + chatRoomMapper.updateById(room); } - chatRoomMapper.updateById(room); + } /** diff --git a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml index c8c31ac..bd98b78 100644 --- a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml +++ b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageHistoryMapper.xml @@ -20,7 +20,7 @@ room_id = #{roomId} - AND id #{id} + AND id #{id} ORDER BY id DESC diff --git a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageMapper.xml b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageMapper.xml index f69ae7b..61d8ab6 100644 --- a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageMapper.xml +++ b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatMessageMapper.xml @@ -17,7 +17,7 @@ room_id = #{roomId} - AND id #{id} + AND id #{id} ORDER BY id DESC diff --git a/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/task/DataTask.java b/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/task/DataTask.java index ef6cd66..cfff4c7 100644 --- a/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/task/DataTask.java +++ b/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/task/DataTask.java @@ -121,6 +121,10 @@ public class DataTask { } uTime ++; } + + + + }); Map> userMap = list.stream().collect(Collectors.groupingBy(MinerDataDto::getMiner));