update 聊天室功能完成

This commit is contained in:
yyb 2025-06-13 15:11:45 +08:00
parent 9ae9a88a46
commit eeee428a94
10 changed files with 69 additions and 42 deletions

View File

@ -58,6 +58,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
.setAllowedOrigins("*"); .setAllowedOrigins("*");
} }
/** /**
* 配置消息代理 * 配置消息代理
* 客户端订阅消息的请求前缀topic用于广播推送queue用于点对点推送 * 客户端订阅消息的请求前缀topic用于广播推送queue用于点对点推送
@ -67,7 +68,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
public void configureMessageBroker(MessageBrokerRegistry config) { public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker(Destination.TOPIC, Destination.QUEUE_USER,Destination.QUEUE_CUSTOMER,Destination.QUEUE_CLOSE_ROOM) config.enableSimpleBroker(Destination.TOPIC, Destination.QUEUE_USER,Destination.QUEUE_CUSTOMER,Destination.QUEUE_CLOSE_ROOM)
.setHeartbeatValue(new long[] {20000, 20000}) .setHeartbeatValue(new long[] {30000, 30000})
.setTaskScheduler(new DefaultManagedTaskScheduler()); .setTaskScheduler(new DefaultManagedTaskScheduler());
//发送消息前缀 //发送消息前缀

View File

@ -10,7 +10,7 @@ import lombok.Getter;
*/ */
@Getter @Getter
public enum ExceptionEnum { public enum ExceptionEnum {
IP_LIMIT_CONNECT(1020, "本机连接数已达上限,请先关闭已有链接"), IP_LIMIT_CONNECT(1020, "本机连接数已达上限9,请先关闭已有链接,再重新链接"),
MAX_LIMIT_CONNECT(1021, "服务器websocket连接数已达上限,服务器拒绝链接"), MAX_LIMIT_CONNECT(1021, "服务器websocket连接数已达上限,服务器拒绝链接"),
SET_PRINCIPAL_FAIL(1022, "websocket链接异常,用户身份设置失败"), SET_PRINCIPAL_FAIL(1022, "websocket链接异常,用户身份设置失败"),
GET_PRINCIPAL_FAIL(1023, "websocket链接异常,用户信息邮箱获取失败"), GET_PRINCIPAL_FAIL(1023, "websocket链接异常,用户信息邮箱获取失败"),

View File

@ -26,7 +26,7 @@ public class ChatRoomDto {
* 聊天室id * 聊天室id
*/ */
@ApiModelProperty(value = "聊天室id", example = "1") @ApiModelProperty(value = "聊天室id", example = "1")
private String id; private Long id;
/** /**
* 聊天对象id一般为客服 * 聊天对象id一般为客服
*/ */

View File

@ -52,7 +52,7 @@ public class WebsocketMessageDto {
/** /**
* 聊天室id * 聊天室id
*/ */
private String roomId; private Long roomId;
/** /**
* 是否创建新聊天室 * 是否创建新聊天室

View File

@ -20,7 +20,9 @@ import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.messaging.support.MessageHeaderAccessor;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -41,9 +43,9 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketChannelInterceptor.class); private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketChannelInterceptor.class);
/** /**
* 当前加入链接的ip ,key ip ,VALUE 邮箱 * 当前加入链接的ip ,key ip+邮箱 ,VALUE 邮箱
*/ */
private static final ConcurrentHashMap<String,EmailLimit> ipConnectionCountMap = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, EmailLimit> ipConnectionCountMap = new ConcurrentHashMap<>();
/** /**
* 当前链接数 * 当前链接数
@ -99,7 +101,21 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
} }
if (accessor.getCommand() == StompCommand.DISCONNECT){ if (accessor.getCommand() == StompCommand.DISCONNECT){
disconnectHandler(accessor); StompHeaderAccessor mha = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if(mha == null){
try {
throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.SET_PRINCIPAL_FAIL));
}catch (MessageDeliveryException e){
System.out.println("错误消息"+e.getMessage());
accessor.setHeader("error", e.getMessage());
}
}
if (mha.getUser() != null){
String email = mha.getUser().getName();
LOGGER.info("断开连接的邮箱为:{}",email);
disconnectHandler(accessor,email);
}
LOGGER.info("------------websocket disconnect"); LOGGER.info("------------websocket disconnect");
} }
return message; return message;
@ -123,7 +139,8 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
Map<String, Object> sessionAttributes = accessor.getSessionAttributes(); Map<String, Object> sessionAttributes = accessor.getSessionAttributes();
if (sessionAttributes != null) { if (sessionAttributes != null) {
String ipAddr = (String) sessionAttributes.get(IPADDR); String ipAddr = (String) sessionAttributes.get(IPADDR);
EmailLimit emailAndCount = ipConnectionCountMap.get(ipAddr); String key = ipAddr + email;
EmailLimit emailAndCount = ipConnectionCountMap.get(key);
//两种ip限制情况 //两种ip限制情况
//本次链接为游客 且ip上已经有人链接直接拒绝本次链接 //本次链接为游客 且ip上已经有人链接直接拒绝本次链接
@ -131,10 +148,11 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT));
} }
// 登录用户现在ip限制不用做了前端在同一ip情况下不同用户会断开连接
//本次链接为登录用户,并且已经链接.直接拒绝本次链接 //本次链接为登录用户,并且已经链接.直接拒绝本次链接
if ( type != TOURIST && emailAndCount != null) { if ( type != TOURIST && emailAndCount != null) {
emailAndCount.setCount(emailAndCount.getCount() + 1); emailAndCount.setCount(emailAndCount.getCount() + 1);
if(email.equals(emailAndCount.getEmail())){ if (emailAndCount.getCount() >= 10){
throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT));
} }
//if(ipConnectionCountMap.containsValue(emailAndCount)){ //if(ipConnectionCountMap.containsValue(emailAndCount)){
@ -145,7 +163,7 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
EmailLimit emailLimit = new EmailLimit(); EmailLimit emailLimit = new EmailLimit();
emailLimit.setEmail(email); emailLimit.setEmail(email);
emailLimit.setCount(1); emailLimit.setCount(1);
ipConnectionCountMap.put(ipAddr,emailLimit ); ipConnectionCountMap.put(key,emailLimit );
} }
}else{ }else{
throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL)); throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL));
@ -157,19 +175,23 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
/** /**
* 断开链接处理 * 断开链接处理
*/ */
private void disconnectHandler(StompHeaderAccessor accessor){ private void disconnectHandler(StompHeaderAccessor accessor,String email){
//连接数减一 //连接数减一
connectionCount.decrementAndGet(); connectionCount.decrementAndGet();
//断开链接的ip //断开链接的ip
Map<String, Object> sessionAttributes = accessor.getSessionAttributes(); Map<String, Object> sessionAttributes = accessor.getSessionAttributes();
if (sessionAttributes != null ) { if (sessionAttributes != null ) {
String ipAddr = (String) sessionAttributes.get(IPADDR); String ipAddr = (String) sessionAttributes.get(IPADDR);
EmailLimit emailAndCount = ipConnectionCountMap.get(ipAddr); String key = ipAddr + email;
EmailLimit emailAndCount = ipConnectionCountMap.get(key);
if (emailAndCount != null ){
if (emailAndCount.getCount() > 1){ if (emailAndCount.getCount() > 1){
emailAndCount.setCount(emailAndCount.getCount() - 1); emailAndCount.setCount(emailAndCount.getCount() - 1);
}else{ }else{
ipConnectionCountMap.remove(ipAddr); ipConnectionCountMap.remove(key);
} }
}
}else{ }else{
throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL)); throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL));

View File

@ -42,6 +42,7 @@ public class WebSocketEventListener implements ApplicationListener<SessionDiscon
StompPrincipal user = (StompPrincipal) event.getUser(); StompPrincipal user = (StompPrincipal) event.getUser();
Message<byte[]> message = event.getMessage(); Message<byte[]> message = event.getMessage();
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (user != null) {
LOGGER.info("用户{}断开链接:用户类型{},消息类型{}",user.getName(), user.getType(), accessor.getCommand()); LOGGER.info("用户{}断开链接:用户类型{},消息类型{}",user.getName(), user.getType(), accessor.getCommand());
if (accessor.getCommand() == StompCommand.DISCONNECT && TOURIST.equals(user.getType())) { if (accessor.getCommand() == StompCommand.DISCONNECT && TOURIST.equals(user.getType())) {
//游客断开链接通知客服删除游客聊天室 //游客断开链接通知客服删除游客聊天室
@ -51,5 +52,8 @@ public class WebSocketEventListener implements ApplicationListener<SessionDiscon
int delete1 = chatMessageMapper.delete(new LambdaUpdateWrapper<ChatMessage>().eq(ChatMessage::getSendEmail, user.getName())); int delete1 = chatMessageMapper.delete(new LambdaUpdateWrapper<ChatMessage>().eq(ChatMessage::getSendEmail, user.getName()));
LOGGER.info("删除游客聊天室个数:{},消息个数{}", delete,delete1); LOGGER.info("删除游客聊天室个数:{},消息个数{}", delete,delete1);
} }
}
} }
} }

View File

@ -94,7 +94,7 @@ public class ChatRoomServiceImpl extends ServiceImpl<ChatRoomMapper, ChatRoom> i
ChatRoom build = ChatRoom.builder().userOneEmail(userEmail).userTwoEmail(sysUser.getEmail()).build(); ChatRoom build = ChatRoom.builder().userOneEmail(userEmail).userTwoEmail(sysUser.getEmail()).build();
int insert = chatRoomMapper.insert(build); int insert = chatRoomMapper.insert(build);
if (insert > 0){ if (insert > 0){
return R.success(ChatRoomDto.builder().id(String.valueOf(build.getId())).selfEmail(userEmail).userEmail(build.getUserTwoEmail()).build()); return R.success(ChatRoomDto.builder().id(build.getId()).selfEmail(userEmail).userEmail(build.getUserTwoEmail()).build());
} }
return R.fail("聊天室不存在,并且创建聊天室失败"); return R.fail("聊天室不存在,并且创建聊天室失败");
} }

View File

@ -62,10 +62,10 @@ public class StompServiceImpl implements StompService {
WebsocketMessageDto build = buildDto(principal, userMessageVo); WebsocketMessageDto build = buildDto(principal, userMessageVo);
//获取当前聊天室对象 //获取当前聊天室对象
ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>() ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>()
.eq(ChatRoom::getUserOneEmail, principal.getName()) .eq(ChatRoom::getUserOneEmail,userMessageVo.getEmail())
.eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail())); .eq(ChatRoom::getUserTwoEmail,principal.getName()));
System.out.println("发送消息聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail()); System.out.println("发送消息聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail());
build.setRoomId(String.valueOf(userMessageVo.getRoomId())); build.setRoomId(userMessageVo.getRoomId());
int serviceReadNum = chatRoom != null ? chatRoom.getClientReadNum() : 0; int serviceReadNum = chatRoom != null ? chatRoom.getClientReadNum() : 0;
build.setClientReadNum( serviceReadNum+ 1); build.setClientReadNum( serviceReadNum+ 1);
//分块传输只有最后一个块拼接完成才能发送消息 //分块传输只有最后一个块拼接完成才能发送消息
@ -79,8 +79,8 @@ public class StompServiceImpl implements StompService {
// handleImage(userMessageVo.getEmail()+principal.getName(),userMessageVo.getContent()); // handleImage(userMessageVo.getEmail()+principal.getName(),userMessageVo.getContent());
//} //}
//TODO 多端情况下需要把消息发送给自己 //多端情况下需要把消息发送给自己
messagingTemplate.convertAndSendToUser(principal.getName(), Destination.QUEUE_CUSTOMER + "/" + principal.getName(),build);
messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_USER + "/" + userMessageVo.getEmail(),build); messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_USER + "/" + userMessageVo.getEmail(),build);
executeTran(principal, userMessageVo, chatRoom); executeTran(principal, userMessageVo, chatRoom);
return AjaxResult.success("成功"); return AjaxResult.success("成功");
@ -90,9 +90,9 @@ public class StompServiceImpl implements StompService {
public AjaxResult sendMessageToCustomer(StompPrincipal principal, UserMessageVo userMessageVo) { public AjaxResult sendMessageToCustomer(StompPrincipal principal, UserMessageVo userMessageVo) {
WebsocketMessageDto build = buildDto(principal, userMessageVo); WebsocketMessageDto build = buildDto(principal, userMessageVo);
ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>() ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>()
.eq(ChatRoom::getUserOneEmail, userMessageVo.getEmail()) .eq(ChatRoom::getUserOneEmail, principal.getName())
.eq(ChatRoom::getUserTwoEmail, principal.getName())); .eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail()));
build.setRoomId(String.valueOf(userMessageVo.getRoomId())); build.setRoomId(userMessageVo.getRoomId());
System.out.println("发送消息聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail()); System.out.println("发送消息聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail());
int serviceReadNum = chatRoom != null ? chatRoom.getServiceReadNum() : 0; int serviceReadNum = chatRoom != null ? chatRoom.getServiceReadNum() : 0;
build.setClientReadNum(serviceReadNum + 1); build.setClientReadNum(serviceReadNum + 1);
@ -107,11 +107,10 @@ public class StompServiceImpl implements StompService {
// handleImage(userMessageVo.getEmail()+principal.getName(),userMessageVo.getContent()); // handleImage(userMessageVo.getEmail()+principal.getName(),userMessageVo.getContent());
//} //}
//TODO 多端情况下需要把消息发送给 // 多端情况下需要把消息发送给
messagingTemplate.convertAndSendToUser(principal.getName(), Destination.QUEUE_USER + "/" + principal.getName(),build);
messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_CUSTOMER + "/" + userMessageVo.getEmail(),build); messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_CUSTOMER + "/" + userMessageVo.getEmail(),build);
executeTran(principal, userMessageVo, chatRoom); executeTran(principal, userMessageVo, chatRoom);
return AjaxResult.success("成功"); return AjaxResult.success("成功");
} }
/** /**
@ -190,7 +189,7 @@ public class StompServiceImpl implements StompService {
.sendEmail(principal.getName()) .sendEmail(principal.getName())
.content(userMessageVo.getContent()) .content(userMessageVo.getContent())
.type(userMessageVo.getType()) .type(userMessageVo.getType())
.roomId(Long.parseLong(userMessageVo.getRoomId())) .roomId(userMessageVo.getRoomId())
.build(); .build();
chatMessageMapper.insert(message); chatMessageMapper.insert(message);
return message.getId(); return message.getId();
@ -203,6 +202,7 @@ public class StompServiceImpl implements StompService {
* @param {bool 废弃 发送者是否在线不在线需要更新未读消息数量} * @param {bool 废弃 发送者是否在线不在线需要更新未读消息数量}
*/ */
private void updateRoom(ChatRoom chatRoom,Integer sendUserType){ private void updateRoom(ChatRoom chatRoom,Integer sendUserType){
System.out.println("聊天室信息"+chatRoom);
if(chatRoom != null){ if(chatRoom != null){
ChatRoom room = ChatRoom.builder().id(chatRoom.getId()).build(); ChatRoom room = ChatRoom.builder().id(chatRoom.getId()).build();
if (CUSTOMER.equals(sendUserType)) { if (CUSTOMER.equals(sendUserType)) {

View File

@ -47,7 +47,7 @@ public class UserMessageVo {
* 聊天室id * 聊天室id
*/ */
@ApiModelProperty(value = "聊天室id", example = "1") @ApiModelProperty(value = "聊天室id", example = "1")
private String roomId; private Long roomId;
///** ///**
// * 总的分片数 // * 总的分片数

View File

@ -7,8 +7,8 @@
id, id,
user_one_email AS userEmail, user_one_email AS userEmail,
flag, flag,
last_user_send_time as lastUserSendTime, CASE WHEN last_user_send_time >= last_customer_send_time
last_customer_send_time as lastCustomerSendTime, THEN last_user_send_time ELSE last_customer_send_time END AS lastUserSendTime,
service_read_num AS clientReadNum service_read_num AS clientReadNum
FROM FROM
chat_room chat_room
@ -26,7 +26,7 @@
</where> </where>
ORDER BY ORDER BY
flag DESC, flag DESC,
last_user_send_time DESC GREATEST( last_user_send_time, last_customer_send_time ) DESC
</select> </select>
<select id="findRoomByUserEmail" resultType="com.m2pool.chat.dto.ChatRoomDto"> <select id="findRoomByUserEmail" resultType="com.m2pool.chat.dto.ChatRoomDto">
SELECT SELECT