This commit is contained in:
yyb 2025-05-14 15:08:16 +08:00
commit e3e7993134
10 changed files with 123 additions and 48 deletions

View File

@ -1,3 +1,4 @@
server:
port: 8101
# Spring

View File

@ -63,7 +63,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker(Destination.TOPIC, Destination.QUEUE)
config.enableSimpleBroker(Destination.TOPIC, Destination.QUEUE_USER,Destination.QUEUE_CUSTOMER)
.setHeartbeatValue(new long[] {10000, 10000})
.setTaskScheduler(new DefaultManagedTaskScheduler());
@ -82,12 +82,23 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
registration
.setMessageSizeLimit(customWebSocketConfig.getMessageSizeLimit())
.setSendTimeLimit(customWebSocketConfig.getSendTimeLimit())
.setSendBufferSizeLimit(customWebSocketConfig.getSendBufferSizeLimit())
.setSendBufferSizeLimit(customWebSocketConfig.getSendBufferSizeLimit());
// 首次连接超时时间.正常情况下前端订阅 心跳包的影响 不会超时断连
.setTimeToFirstMessage(customWebSocketConfig.getTimeToFirstMessage());
// .setTimeToFirstMessage(customWebSocketConfig.getTimeToFirstMessage());
}
// @Bean
// public ServletServerContainerFactoryBean createServletServerContainerFactoryBean() {
// ServletServerContainerFactoryBean factoryBean = new ServletServerContainerFactoryBean();
// factoryBean.setMaxTextMessageBufferSize(customWebSocketConfig.getMessageSizeLimit());
// factoryBean.setMaxBinaryMessageBufferSize(customWebSocketConfig.getMessageSizeLimit());
// factoryBean.setMaxSessionIdleTimeout(2048L * 2048L);
// factoryBean.setAsyncSendTimeout(2048L * 2048L);
// return factoryBean;
// }
/**
* 配置客户端出站通道拦截器默认线程1

View File

@ -7,11 +7,21 @@ package com.m2pool.chat.constant;
* @Date 2025/4/14 14:54
*/
public class Destination {
/**
* stomp 默认单对单 发送目的地.单对单消息默认 /
*/
public static final String QUEUE = "/queue";
/**
* stomp 默认单对单 发送目的地.单对单消息默认 /
*/
public static final String QUEUE_USER = "/queue/user";
/**
* stomp 默认单对单 发送目的地.单对单消息默认 /
*/
public static final String QUEUE_CUSTOMER = "/queue/customer";
/**
* 聊天室关闭 路径
*/
@ -23,9 +33,9 @@ public class Destination {
public static final String TOPIC = "/topic";
/**
* 前端订阅消息所需前缀 stomp 默认user前缀
* 前端订阅消息所需前缀point stomp 默认user前缀
*/
public static final String USER_PREFIX = "/user";
public static final String USER_PREFIX = "/sub";
/**
* stomp 默认发送消息前缀

View File

@ -27,13 +27,25 @@ public class StompController extends BaseController {
private StompService stompService;
/**
* 发送消息到对应的用户
* 发送消息给普通用户和游客
* @param userMessageVo 消息体
* @return 返回值通过CommonMessageConvert消息转换器转换
*/
@MessageMapping("/send/message")
@SendToUser("/queue")
@MessageMapping("/send/message/to/user")
@SendToUser("/queue/user")
public AjaxResult sendMessageToUser(StompPrincipal principal, @Payload UserMessageVo userMessageVo) {
return stompService.sendMessageToUser(principal,userMessageVo);
}
/**
* 发送消息到对应的客服
* @param userMessageVo 消息体
* @return 返回值通过CommonMessageConvert消息转换器转换
*/
@MessageMapping("/send/message/to/customer")
@SendToUser("/queue/customer/")
public AjaxResult sendMessageToCustomer(StompPrincipal principal, @Payload UserMessageVo userMessageVo) {
return stompService.sendMessageToCustomer(principal,userMessageVo);
}
}

View File

@ -13,4 +13,11 @@ public interface StompService {
*/
AjaxResult sendMessageToUser(StompPrincipal principal, UserMessageVo userMessageVo);
/**
* 发送消息给客服
* @param userMessageVo
* @return
*/
AjaxResult sendMessageToCustomer(StompPrincipal principal, UserMessageVo userMessageVo);
}

View File

@ -9,6 +9,7 @@ import com.m2pool.chat.mapper.ChatMessageMapper;
import com.m2pool.chat.mapper.ChatRoomMapper;
import com.m2pool.chat.service.ChatMessageService;
import com.m2pool.common.core.Result.R;
import com.m2pool.common.core.utils.StringUtils;
import com.m2pool.common.security.utils.SecurityUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -17,8 +18,7 @@ import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
import static com.m2pool.chat.constant.UserType.CUSTOMER;
import static com.m2pool.chat.constant.UserType.LOGIN_USER;
import static com.m2pool.chat.constant.UserType.*;
@Service
public class ChatMessageServiceImpl implements ChatMessageService {
@ -44,6 +44,9 @@ public class ChatMessageServiceImpl implements ChatMessageService {
@Override
public R<List<ChatMessageDto>> findRecentlyMessage(String email,Long id,Integer pageNum,Long roomId) {
ChatMessage chatMessage;
if(StringUtils.isEmpty(email)){
return R.fail("查询失败,用户标识或邮箱不能为空");
}
if(id != null && id != 0){
chatMessage = chatMessageMapper.selectById(id);
}else{
@ -81,7 +84,7 @@ public class ChatMessageServiceImpl implements ChatMessageService {
.id(roomId)
.build();
if (chatRoom != null){
if (Objects.equals(type, LOGIN_USER)) {
if (Objects.equals(type, LOGIN_USER) || Objects.equals(type, TOURIST) ) {
build.setClientReadNum(0);
}
if(Objects.equals(type, CUSTOMER)){

View File

@ -49,6 +49,44 @@ public class StompServiceImpl implements StompService {
@Override
public AjaxResult sendMessageToUser(StompPrincipal principal, UserMessageVo userMessageVo) {
WebsocketMessageDto build = buildDto(principal, userMessageVo);
//获取当前聊天室对象
ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>()
.eq(ChatRoom::getUserOneEmail, principal.getName())
.eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail()));
build.setRoomId(String.valueOf(userMessageVo.getRoomId()));
int serviceReadNum = chatRoom != null ? chatRoom.getClientReadNum() : 0;
build.setClientReadNum( serviceReadNum+ 1);
//不在判断接收者是否在线
// boolean bool = checkOnline(userMessageVo);
//在线用户才发送消息
messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_USER + "/" + userMessageVo.getEmail(),build);
executeTran(principal, userMessageVo, chatRoom);
return AjaxResult.success("成功");
}
@Override
public AjaxResult sendMessageToCustomer(StompPrincipal principal, UserMessageVo userMessageVo) {
WebsocketMessageDto build = buildDto(principal, userMessageVo);
ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>()
.eq(ChatRoom::getUserOneEmail, userMessageVo.getEmail())
.eq(ChatRoom::getUserTwoEmail, principal.getName()));
build.setRoomId(String.valueOf(userMessageVo.getRoomId()));
int serviceReadNum = chatRoom != null ? chatRoom.getServiceReadNum() : 0;
build.setClientReadNum(serviceReadNum + 1);
messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE_CUSTOMER + "/" + userMessageVo.getEmail(),build);
executeTran(principal, userMessageVo, chatRoom);
return AjaxResult.success("成功");
}
/**
* 构建聊天实时返回信息
* @param principal
* @param userMessageVo
* @return
*/
private WebsocketMessageDto buildDto(StompPrincipal principal, UserMessageVo userMessageVo) {
Boolean isCreate = principal.getIsCreate();
WebsocketMessageDto build = WebsocketMessageDto.builder()
.type(userMessageVo.getType())
@ -62,28 +100,16 @@ public class StompServiceImpl implements StompService {
if (isCreate){
principal.setIsCreate(false);
}
//获取当前聊天室对象
ChatRoom chatRoom;
if(!CUSTOMER.equals(principal.getType())){
chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>()
.eq(ChatRoom::getUserOneEmail, principal.getName())
.eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail()));
build.setRoomId(String.valueOf(userMessageVo.getRoomId()));
build.setClientReadNum(chatRoom.getServiceReadNum() + 1);
}else {
chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper<ChatRoom>()
.eq(ChatRoom::getUserOneEmail, userMessageVo.getEmail())
.eq(ChatRoom::getUserTwoEmail, principal.getName()));
build.setRoomId(String.valueOf(userMessageVo.getRoomId()));
build.setClientReadNum(chatRoom.getClientReadNum() + 1);
return build;
}
boolean bool = checkOnline(userMessageVo);
//在线用户才发送消息
if (bool){
messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE + "/" + userMessageVo.getEmail(),build);
}
/**
* 执行消息表新增消息和聊天室已读未读消息数量更新事务
* @param principal
* @param userMessageVo
* @param chatRoom
*/
private void executeTran(StompPrincipal principal, UserMessageVo userMessageVo, ChatRoom chatRoom){
// 消息存储
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
@ -92,7 +118,7 @@ public class StompServiceImpl implements StompService {
// 插入消息并获取 ID
insertMessage(principal,userMessageVo);
//聊天室已读未读数量更新
updateRoom(chatRoom,principal.getType(),bool);
updateRoom(chatRoom,principal.getType());
} catch (Exception e) {
// 回滚事务
status.setRollbackOnly();
@ -100,8 +126,6 @@ public class StompServiceImpl implements StompService {
}
}
});
return AjaxResult.success("成功");
}
/**
@ -109,6 +133,7 @@ public class StompServiceImpl implements StompService {
* @return
*/
private boolean checkOnline(UserMessageVo userMessageVo){
return userRegistry.getUsers().stream()
.anyMatch(user -> user.getName().equals(userMessageVo.getEmail()));
}
@ -133,10 +158,10 @@ public class StompServiceImpl implements StompService {
* 修改聊天信息
* @param chatRoom 本次聊天聊天室信息
* @param sendUserType 发送者类型
* @param bool 发送者是否在线不在线需要更新未读消息数量
* @param {bool 废弃 发送者是否在线不在线需要更新未读消息数量}
*/
private void updateRoom(ChatRoom chatRoom,Integer sendUserType,Boolean bool){
private void updateRoom(ChatRoom chatRoom,Integer sendUserType){
if(chatRoom != null){
ChatRoom room = ChatRoom.builder().id(chatRoom.getId()).build();
if (CUSTOMER.equals(sendUserType)) {
room.setClientReadNum(chatRoom.getClientReadNum() + 1);
@ -148,6 +173,8 @@ public class StompServiceImpl implements StompService {
chatRoomMapper.updateById(room);
}
}
/**
* 用于客服关闭断线客服端聊天室接口
* @param roomId 游客与客服聊天室id, 实际为游客邮箱

View File

@ -20,7 +20,7 @@
<where>
room_id = #{roomId}
<if test="id != null">
AND id <![CDATA[ <= ]]> #{id}
AND id <![CDATA[ < ]]> #{id}
</if>
</where>
ORDER BY id DESC

View File

@ -17,7 +17,7 @@
<where>
room_id = #{roomId}
<if test="id != null">
AND id <![CDATA[ <= ]]> #{id}
AND id <![CDATA[ < ]]> #{id}
</if>
</where>
ORDER BY id DESC

View File

@ -121,6 +121,10 @@ public class DataTask {
}
uTime ++;
}
});
Map<String, List<MinerDataDto>> userMap = list.stream().collect(Collectors.groupingBy(MinerDataDto::getMiner));