diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java index d107d9d..a865c64 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java @@ -58,6 +58,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { .setAllowedOrigins("*"); } + /** * 配置消息代理 * 客户端订阅消息的请求前缀,topic用于广播推送,queue用于点对点推送 @@ -67,7 +68,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker(Destination.TOPIC, Destination.QUEUE_USER,Destination.QUEUE_CUSTOMER,Destination.QUEUE_CLOSE_ROOM) - .setHeartbeatValue(new long[] {20000, 20000}) + .setHeartbeatValue(new long[] {30000, 30000}) .setTaskScheduler(new DefaultManagedTaskScheduler()); //发送消息前缀 diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java index 1d56cfa..4de349b 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java @@ -10,7 +10,7 @@ import lombok.Getter; */ @Getter public enum ExceptionEnum { - IP_LIMIT_CONNECT(1020, "本机连接数已达上限,请先关闭已有链接"), + IP_LIMIT_CONNECT(1020, "本机连接数已达上限9,请先关闭已有链接,再重新链接"), MAX_LIMIT_CONNECT(1021, "服务器websocket连接数已达上限,服务器拒绝链接"), SET_PRINCIPAL_FAIL(1022, "websocket链接异常,用户身份设置失败"), GET_PRINCIPAL_FAIL(1023, "websocket链接异常,用户信息邮箱获取失败"), diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java index 92e96ba..247a426 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/ChatRoomDto.java @@ -26,7 +26,7 @@ public class ChatRoomDto { * 聊天室id */ @ApiModelProperty(value = "聊天室id", example = "1") - private String id; + private Long id; /** * 聊天对象id:一般为客服 */ diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java index 8b179dd..19ecc1a 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java @@ -52,7 +52,7 @@ public class WebsocketMessageDto { /** * 聊天室id */ - private String roomId; + private Long roomId; /** * 是否创建新聊天室 diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java index 62321da..86d4bfa 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java @@ -20,7 +20,9 @@ import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.MessageHeaderAccessor; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -41,9 +43,9 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketChannelInterceptor.class); /** - * 当前加入链接的ip ,key 为 ip ,VALUE 邮箱 + * 当前加入链接的ip ,key 为 ip+邮箱 ,VALUE 邮箱 */ - private static final ConcurrentHashMap ipConnectionCountMap = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap ipConnectionCountMap = new ConcurrentHashMap<>(); /** * 当前链接数 @@ -99,7 +101,21 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { } if (accessor.getCommand() == StompCommand.DISCONNECT){ - disconnectHandler(accessor); + StompHeaderAccessor mha = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + if(mha == null){ + try { + throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.SET_PRINCIPAL_FAIL)); + }catch (MessageDeliveryException e){ + System.out.println("错误消息"+e.getMessage()); + accessor.setHeader("error", e.getMessage()); + } + + } + if (mha.getUser() != null){ + String email = mha.getUser().getName(); + LOGGER.info("断开连接的邮箱为:{}",email); + disconnectHandler(accessor,email); + } LOGGER.info("------------websocket disconnect"); } return message; @@ -123,7 +139,8 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { Map sessionAttributes = accessor.getSessionAttributes(); if (sessionAttributes != null) { String ipAddr = (String) sessionAttributes.get(IPADDR); - EmailLimit emailAndCount = ipConnectionCountMap.get(ipAddr); + String key = ipAddr + email; + EmailLimit emailAndCount = ipConnectionCountMap.get(key); //两种ip限制情况 //本次链接为游客 且ip上已经有人链接直接拒绝本次链接 @@ -131,10 +148,11 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); } + // 登录用户现在ip限制不用做了,前端在同一ip情况下,不同用户会断开连接 //本次链接为登录用户,并且已经链接.直接拒绝本次链接 if ( type != TOURIST && emailAndCount != null) { emailAndCount.setCount(emailAndCount.getCount() + 1); - if(email.equals(emailAndCount.getEmail())){ + if (emailAndCount.getCount() >= 10){ throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); } //if(ipConnectionCountMap.containsValue(emailAndCount)){ @@ -145,7 +163,7 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { EmailLimit emailLimit = new EmailLimit(); emailLimit.setEmail(email); emailLimit.setCount(1); - ipConnectionCountMap.put(ipAddr,emailLimit ); + ipConnectionCountMap.put(key,emailLimit ); } }else{ throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL)); @@ -157,20 +175,24 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { /** * 断开链接处理 */ - private void disconnectHandler(StompHeaderAccessor accessor){ + private void disconnectHandler(StompHeaderAccessor accessor,String email){ //连接数减一 connectionCount.decrementAndGet(); //断开链接的ip Map sessionAttributes = accessor.getSessionAttributes(); if (sessionAttributes != null ) { String ipAddr = (String) sessionAttributes.get(IPADDR); - EmailLimit emailAndCount = ipConnectionCountMap.get(ipAddr); - if (emailAndCount.getCount() > 1){ - emailAndCount.setCount(emailAndCount.getCount() - 1); - }else{ - ipConnectionCountMap.remove(ipAddr); + String key = ipAddr + email; + EmailLimit emailAndCount = ipConnectionCountMap.get(key); + if (emailAndCount != null ){ + if (emailAndCount.getCount() > 1){ + emailAndCount.setCount(emailAndCount.getCount() - 1); + }else{ + ipConnectionCountMap.remove(key); + } } + }else{ throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL)); } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java index 61e1b22..cba6736 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/WebSocketEventListener.java @@ -42,14 +42,18 @@ public class WebSocketEventListener implements ApplicationListener message = event.getMessage(); StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); - LOGGER.info("用户{}断开链接:用户类型{},消息类型{}",user.getName(), user.getType(), accessor.getCommand()); - if (accessor.getCommand() == StompCommand.DISCONNECT && TOURIST.equals(user.getType())) { - //游客断开链接,通知客服删除游客聊天室 - stompService.customerCloseRoom(user.getName()); - // 删除数据库中游客数据 - int delete = chatRoomMapper.delete(new LambdaUpdateWrapper().eq(ChatRoom::getUserOneEmail, user.getName())); - int delete1 = chatMessageMapper.delete(new LambdaUpdateWrapper().eq(ChatMessage::getSendEmail, user.getName())); - LOGGER.info("删除游客聊天室个数:{},消息个数{}", delete,delete1); + if (user != null) { + LOGGER.info("用户{}断开链接:用户类型{},消息类型{}",user.getName(), user.getType(), accessor.getCommand()); + if (accessor.getCommand() == StompCommand.DISCONNECT && TOURIST.equals(user.getType())) { + //游客断开链接,通知客服删除游客聊天室 + stompService.customerCloseRoom(user.getName()); + // 删除数据库中游客数据 + int delete = chatRoomMapper.delete(new LambdaUpdateWrapper().eq(ChatRoom::getUserOneEmail, user.getName())); + int delete1 = chatMessageMapper.delete(new LambdaUpdateWrapper().eq(ChatMessage::getSendEmail, user.getName())); + LOGGER.info("删除游客聊天室个数:{},消息个数{}", delete,delete1); + } + } + } } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java index b01da3b..20a9a89 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/ChatRoomServiceImpl.java @@ -94,7 +94,7 @@ public class ChatRoomServiceImpl extends ServiceImpl i ChatRoom build = ChatRoom.builder().userOneEmail(userEmail).userTwoEmail(sysUser.getEmail()).build(); int insert = chatRoomMapper.insert(build); if (insert > 0){ - return R.success(ChatRoomDto.builder().id(String.valueOf(build.getId())).selfEmail(userEmail).userEmail(build.getUserTwoEmail()).build()); + return R.success(ChatRoomDto.builder().id(build.getId()).selfEmail(userEmail).userEmail(build.getUserTwoEmail()).build()); } return R.fail("聊天室不存在,并且创建聊天室失败"); } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java index 7d0d595..b11c0b6 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java @@ -62,10 +62,10 @@ public class StompServiceImpl implements StompService { WebsocketMessageDto build = buildDto(principal, userMessageVo); //获取当前聊天室对象 ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper() - .eq(ChatRoom::getUserOneEmail, principal.getName()) - .eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail())); + .eq(ChatRoom::getUserOneEmail,userMessageVo.getEmail()) + .eq(ChatRoom::getUserTwoEmail,principal.getName())); System.out.println("发送消息,聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail()); - build.setRoomId(String.valueOf(userMessageVo.getRoomId())); + build.setRoomId(userMessageVo.getRoomId()); int serviceReadNum = chatRoom != null ? chatRoom.getClientReadNum() : 0; build.setClientReadNum( serviceReadNum+ 1); //分块传输,只有最后一个块拼接完成才能发送消息 @@ -79,8 +79,8 @@ public class StompServiceImpl implements StompService { // handleImage(userMessageVo.getEmail()+principal.getName(),userMessageVo.getContent()); //} - //TODO 多端情况下,需要把消息发送给自己 - + //多端情况下,需要把消息发送给自己 + messagingTemplate.convertAndSendToUser(principal.getName(), Destination.QUEUE_CUSTOMER + "/" + principal.getName(),build); messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_USER + "/" + userMessageVo.getEmail(),build); executeTran(principal, userMessageVo, chatRoom); return AjaxResult.success("成功"); @@ -90,9 +90,9 @@ public class StompServiceImpl implements StompService { public AjaxResult sendMessageToCustomer(StompPrincipal principal, UserMessageVo userMessageVo) { WebsocketMessageDto build = buildDto(principal, userMessageVo); ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper() - .eq(ChatRoom::getUserOneEmail, userMessageVo.getEmail()) - .eq(ChatRoom::getUserTwoEmail, principal.getName())); - build.setRoomId(String.valueOf(userMessageVo.getRoomId())); + .eq(ChatRoom::getUserOneEmail, principal.getName()) + .eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail())); + build.setRoomId(userMessageVo.getRoomId()); System.out.println("发送消息,聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail()); int serviceReadNum = chatRoom != null ? chatRoom.getServiceReadNum() : 0; build.setClientReadNum(serviceReadNum + 1); @@ -107,11 +107,10 @@ public class StompServiceImpl implements StompService { // handleImage(userMessageVo.getEmail()+principal.getName(),userMessageVo.getContent()); //} - //TODO 多端情况下,需要把消息发送给 - + // 多端情况下,需要把消息发送给 + messagingTemplate.convertAndSendToUser(principal.getName(), Destination.QUEUE_USER + "/" + principal.getName(),build); messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_CUSTOMER + "/" + userMessageVo.getEmail(),build); executeTran(principal, userMessageVo, chatRoom); - return AjaxResult.success("成功"); } /** @@ -190,7 +189,7 @@ public class StompServiceImpl implements StompService { .sendEmail(principal.getName()) .content(userMessageVo.getContent()) .type(userMessageVo.getType()) - .roomId(Long.parseLong(userMessageVo.getRoomId())) + .roomId(userMessageVo.getRoomId()) .build(); chatMessageMapper.insert(message); return message.getId(); @@ -203,6 +202,7 @@ public class StompServiceImpl implements StompService { * @param {bool 废弃 发送者是否在线,不在线,需要更新未读消息数量} */ private void updateRoom(ChatRoom chatRoom,Integer sendUserType){ + System.out.println("聊天室信息"+chatRoom); if(chatRoom != null){ ChatRoom room = ChatRoom.builder().id(chatRoom.getId()).build(); if (CUSTOMER.equals(sendUserType)) { diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java index d768759..09a2a38 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java @@ -47,7 +47,7 @@ public class UserMessageVo { * 聊天室id */ @ApiModelProperty(value = "聊天室id", example = "1") - private String roomId; + private Long roomId; ///** // * 总的分片数 diff --git a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml index 5298eac..15ff1a0 100644 --- a/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml +++ b/m2pool-modules/m2pool-chat/src/main/resources/mapper/chat/ChatRoomMapper.xml @@ -7,8 +7,8 @@ id, user_one_email AS userEmail, flag, - last_user_send_time as lastUserSendTime, - last_customer_send_time as lastCustomerSendTime, + CASE WHEN last_user_send_time >= last_customer_send_time + THEN last_user_send_time ELSE last_customer_send_time END AS lastUserSendTime, service_read_num AS clientReadNum FROM chat_room @@ -26,7 +26,7 @@ ORDER BY flag DESC, - last_user_send_time DESC + GREATEST( last_user_send_time, last_customer_send_time ) DESC - \ No newline at end of file +