update 废弃历史消息查询接口,自定义ws链接和ip限制数监听器

This commit is contained in:
yyb 2025-04-30 14:02:32 +08:00
parent 340f81a513
commit 934be40c8b
26 changed files with 304 additions and 68 deletions

View File

@ -2,15 +2,18 @@ package com.m2pool.chat.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@RefreshScope
@ConfigurationProperties(prefix = "websocket.transport")
public class CustomWebSocketConfig {
private int messageSizeLimit;
private int sendTimeLimit;
private int sendBufferSizeLimit;
private int timeToFirstMessage;
private int maxConnections;
private int maxConnections;
private String defaultCustomerEmail;
}

View File

@ -4,6 +4,7 @@ import com.m2pool.chat.constant.Destination;
import com.m2pool.chat.coverter.CommonMessageConvert;
import com.m2pool.chat.interceptor.WebsocketChannelInterceptor;
import com.m2pool.chat.interceptor.WebsocketHandshakeInterceptor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.config.ChannelRegistration;
@ -31,6 +32,8 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Resource
private CustomWebSocketConfig customWebSocketConfig;
@Resource
private ApplicationEventPublisher applicationEventPublisher;
/**
* 注册 Stomp的端点 可以注册多个端点
* addEndpoint添加STOMP协议的端点客户端访问地址
@ -96,7 +99,7 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
.maxPoolSize(20)
.keepAliveSeconds(60);
// 拦截器配置
registration.interceptors(new WebsocketChannelInterceptor(customWebSocketConfig));
registration.interceptors(new WebsocketChannelInterceptor(customWebSocketConfig,applicationEventPublisher));
}
/**

View File

@ -8,10 +8,15 @@ package com.m2pool.chat.constant;
*/
public class Destination {
/**
* stomp 默认单对单 发送目的地
* stomp 默认单对单 发送目的地.单对单消息默认 /
*/
public static final String QUEUE = "/queue";
/**
* 聊天室关闭 路径
*/
public static final String QUEUE_CLOSE_ROOM = "/close/room/";
/**
* stomp 默认群发 目的地
*/

View File

@ -26,7 +26,7 @@ public class ChatMessageController {
* @return
*/
@PostMapping("/find/history/message")
@ApiOperation(value = "查询7天前的历史记录", notes = "根据ID查询7天前的历史记录")
@ApiOperation(value = "查询7天前的历史记录(废弃,请使用/find/recently/message", notes = "根据ID查询7天前的历史记录")
@RequiresLogin
public R<List<ChatMessageDto>> findHistoryMessage(@RequestBody MessagePageVo pageVo) {
return chatMessageService.findHistoryMessage(pageVo.getId(), pageVo.getPageNum(), pageVo.getRoomId());
@ -40,7 +40,7 @@ public class ChatMessageController {
}
@GetMapping("/delete/message")
@ApiOperation(value = "消息撤回或删除")
@ApiOperation(value = "消息撤回或删除(废弃)")
@RequiresLogin
public R<String> deleteMessage(
@ApiParam(value = "消息id", required = true, example = "1")

View File

@ -47,6 +47,7 @@ public class CommonMessageConvert implements MessageConverter {
public Message<?> toMessage(Object payload, MessageHeaders headers) {
String str = JsonUtil.toJson(payload);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
System.out.println("发送给前端的消息"+new GenericMessage<>(bytes, headers));
return new GenericMessage<>(bytes, headers);
}

View File

@ -26,7 +26,7 @@ public class ChatRoomDto {
* 聊天室id
*/
@ApiModelProperty(value = "聊天室id", example = "1")
private Long id;
private String id;
/**
* 聊天对象id
*/

View File

@ -52,7 +52,7 @@ public class WebsocketMessageDto {
/**
* 聊天室id
*/
private Long roomId;
private String roomId;
/**
* 是否创建新聊天室

View File

@ -3,11 +3,13 @@ package com.m2pool.chat.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor

View File

@ -7,6 +7,7 @@ import com.m2pool.chat.entity.StompPrincipal;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@ -17,7 +18,6 @@ import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static com.m2pool.chat.constant.CustomHeader.*;
import static com.m2pool.chat.constant.UserType.LOGIN_USER;
import static com.m2pool.chat.constant.UserType.TOURIST;
import static com.m2pool.chat.utils.StompUtils.getConnectCustomHeaders;
/**
@ -53,8 +54,11 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
*/
private CustomWebSocketConfig customWebSocketConfig;
public WebsocketChannelInterceptor(CustomWebSocketConfig customWebSocketConfig) {
private ApplicationEventPublisher applicationEventPublisher;
public WebsocketChannelInterceptor(CustomWebSocketConfig customWebSocketConfig, ApplicationEventPublisher applicationEventPublisher) {
this.customWebSocketConfig = customWebSocketConfig;
this.applicationEventPublisher = applicationEventPublisher;
}
/**
@ -75,20 +79,14 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
int type = Integer.parseInt(getConnectCustomHeaders(raw, TYPE));
String email = getConnectCustomHeaders(raw, EMAIL);
//最大链接数限制
maxConnectionsLimit();
//根据客服端ip + 用户类型限制连接数
ipLimit(accessor,type,email);
//链接请求头中用户信息存入stomp中
mha.setUser(new StompPrincipal(email,type,true));
}
if (accessor.getCommand() == StompCommand.SEND) {
LOGGER.info("------------websocket send message");
}
if (accessor.getCommand() == StompCommand.SUBSCRIBE) {
LOGGER.info("------------websocket subscribe message");
}
@ -122,7 +120,6 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
Map<String, Object> sessionAttributes = accessor.getSessionAttributes();
if (sessionAttributes != null) {
String ipAddr = (String) sessionAttributes.get(IPADDR);
System.out.println(accessor.getSessionId());
String hasConnectionEmail = ipConnectionCountMap.get(ipAddr);
//两种ip限制情况
//本次链接为游客 且ip上已经有人链接直接拒绝本次链接
@ -132,8 +129,11 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
//本次链接为登录用户,并且已经链接.直接拒绝本次链接(多用户登录)
if ( type == LOGIN_USER ) {
if(email.equals(hasConnectionEmail) || ipConnectionCountMap.containsValue(email)){
// StompPrincipal principal = new StompPrincipal(email, type, true);
// applicationEventPublisher.publishEvent(new IpLimitEvent(this, principal));
throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT));
}
}
ipConnectionCountMap.put(ipAddr,email);
@ -142,22 +142,7 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
}
}
/**
* 获取stomp自定义请求头
* @param raw
* @param headerKey
* @return
*/
private String getConnectCustomHeaders(Object raw,String headerKey){
String headerValue = "";
if (raw instanceof Map) {
Object value = ((Map<?, ?>) raw).get(headerKey);
if(value instanceof ArrayList){
headerValue = ((ArrayList<?>) value).get(0).toString();
}
}
return headerValue;
}
/**
* 断开链接处理
@ -177,28 +162,24 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor {
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOGGER.info("------------WebsocketChannelInterceptor-postSend");
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {
LOGGER.info("-----------WebsocketChannelInterceptor-afterSendCompletion");
}
@Override
public boolean preReceive(MessageChannel channel) {
LOGGER.info("----------WebsocketChannelInterceptor-preReceive");
return true;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
LOGGER.info("----------WebsocketChannelInterceptor-postReceive");
return message;
}
@Override
public void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
LOGGER.info("----------WebsocketChannelInterceptor-afterReceiveCompletion");
}
}

View File

@ -0,0 +1,21 @@
package com.m2pool.chat.listener;
import com.m2pool.chat.entity.StompPrincipal;
import org.springframework.context.ApplicationEvent;
/**
* 登录限制事件
*/
public class IpLimitEvent extends ApplicationEvent {
private final StompPrincipal principal;
public IpLimitEvent(Object source, StompPrincipal principal) {
super(source);
this.principal = principal;
}
public StompPrincipal getPrincipal() {
return principal;
}
}

View File

@ -0,0 +1,22 @@
package com.m2pool.chat.listener;
import com.m2pool.chat.entity.StompPrincipal;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
public class IpLimitListener implements ApplicationListener<IpLimitEvent> {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Override
public void onApplicationEvent(IpLimitEvent event) {
StompPrincipal principal =event.getPrincipal();
// messagingTemplate.convertAndSendToUser(principal.getName(),
// Destination.QUEUE + "/" + principal.getName()
// , "ip链接数限制");
}
}

View File

@ -0,0 +1,41 @@
package com.m2pool.chat.listener;
import com.m2pool.chat.entity.StompPrincipal;
import com.m2pool.chat.service.impl.StompServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import javax.annotation.Resource;
import static com.m2pool.chat.constant.UserType.TOURIST;
/**
* websocket通道关闭监听器
*/
@Component
public class WebSocketEventListener implements ApplicationListener<SessionDisconnectEvent>{
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketEventListener.class);
@Resource
private StompServiceImpl stompService;
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
StompPrincipal user = (StompPrincipal) event.getUser();
Message<byte[]> message = event.getMessage();
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (user != null && accessor.getCommand() == StompCommand.DISCONNECT && TOURIST.equals(user.getType())) {
//游客断开链接通知客服删除游客聊天室
stompService.customerCloseRoom(user.getName());
// 删除数据库中游客数据
}
}
}

View File

@ -1,14 +1,16 @@
package com.m2pool.chat.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.m2pool.chat.dto.ChatMessageDto;
import com.m2pool.chat.entity.ChatMessageHistory;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface ChatMessageHistoryMapper {
public interface ChatMessageHistoryMapper extends BaseMapper<ChatMessageHistory> {
/**
* 根据游标查询十条数据
@ -17,6 +19,6 @@ public interface ChatMessageHistoryMapper {
* @param roomId
* @return
*/
List<ChatMessageDto> findHistoryMessage(@Param("id") Long id,@Param("pageNum") Integer pageNum,@Param("roomId") Long roomId);
List<ChatMessageDto> findHistoryMessage(@Param("email") String email,@Param("id") Long id,@Param("pageNum") Integer pageNum,@Param("roomId") Long roomId);
}

View File

@ -0,0 +1,8 @@
package com.m2pool.chat.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.m2pool.chat.entity.ChatMessageHistory;
public interface ChatMessageHistoryService extends IService<ChatMessageHistory> {
}

View File

@ -15,7 +15,7 @@ public interface ChatRoomService extends IService<ChatRoom> {
* @param roomPageVo
* @return
*/
TableDataInfo findRoomList(RoomPageVo roomPageVo);
TableDataInfo<ChatRoomDto> findRoomList(RoomPageVo roomPageVo);
/**
* 根据当前用户邮箱查询聊天室

View File

@ -0,0 +1,11 @@
package com.m2pool.chat.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.m2pool.chat.entity.ChatMessageHistory;
import com.m2pool.chat.mapper.ChatMessageHistoryMapper;
import com.m2pool.chat.service.ChatMessageHistoryService;
import org.springframework.stereotype.Service;
@Service
public class ChatMessageHistoryServiceImpl extends ServiceImpl<ChatMessageHistoryMapper, ChatMessageHistory> implements ChatMessageHistoryService {
}

View File

@ -1,6 +1,7 @@
package com.m2pool.chat.service.impl;
import com.m2pool.chat.dto.ChatMessageDto;
import com.m2pool.chat.entity.ChatMessage;
import com.m2pool.chat.entity.ChatRoom;
import com.m2pool.chat.mapper.ChatMessageHistoryMapper;
import com.m2pool.chat.mapper.ChatMessageMapper;
@ -31,15 +32,28 @@ public class ChatMessageServiceImpl implements ChatMessageService {
private ChatRoomMapper chatRoomMapper;
@Override
public R<List<ChatMessageDto>> findHistoryMessage(Long id,Integer pageNum,Long roomId) {
List<ChatMessageDto> historyMessage = chatMessageHistoryMapper.findHistoryMessage(id, pageNum, roomId);
return R.success(historyMessage);
public R<List<ChatMessageDto>>
findHistoryMessage(Long id,Integer pageNum,Long roomId) {
String email = SecurityUtils.getUsername();
List<ChatMessageDto> messages;
messages = chatMessageMapper.findRecentlyMessage(email,id, pageNum, roomId);
return R.success(messages);
}
@Override
public R<List<ChatMessageDto>> findRecentlyMessage(Long id,Integer pageNum,Long roomId) {
String email = SecurityUtils.getUsername();
List<ChatMessageDto> recentlyMessage = chatMessageMapper.findRecentlyMessage(email,id, pageNum, roomId);
ChatMessage chatMessage = chatMessageMapper.selectById(id);
List<ChatMessageDto> recentlyMessage;
if(chatMessage != null){
recentlyMessage = chatMessageMapper.findRecentlyMessage(email,id, pageNum, roomId);
if(recentlyMessage.size() != pageNum){
recentlyMessage.addAll(chatMessageHistoryMapper.findHistoryMessage(email, id, pageNum - recentlyMessage.size(), roomId));
}
}else{
recentlyMessage = chatMessageHistoryMapper.findHistoryMessage(email, id, pageNum, roomId);
}
return R.success(recentlyMessage);
}
@ -62,10 +76,10 @@ public class ChatMessageServiceImpl implements ChatMessageService {
.build();
if (chatRoom != null){
if (Objects.equals(type, LOGIN_USER)) {
build.setServiceReadNum(0);
build.setClientReadNum(0);
}
if(Objects.equals(type, CUSTOMER)){
build.setClientReadNum(0);
build.setServiceReadNum(0);
}
int i = chatRoomMapper.updateById(build);
return i >= 0 ? R.success("已读") : R.fail("消息读取失败");

View File

@ -44,22 +44,29 @@ public class ChatRoomServiceImpl extends ServiceImpl<ChatRoomMapper, ChatRoom> i
@Override
public TableDataInfo findRoomList(RoomPageVo roomPageVo) {
public TableDataInfo<ChatRoomDto> findRoomList(RoomPageVo roomPageVo) {
String userEmail = SecurityUtils.getUsername();
PageHelper.startPage(1, 20);
//1.查找当前客服对应的聊天室
List<ChatRoomDto> roomList = chatRoomMapper.findRoomList(userEmail,roomPageVo.getSendDateTime());
PageUtils.clearPage();
// if (roomList.isEmpty()){
// TableDataInfo tableDataInfo = new TableDataInfo();
// tableDataInfo.setCode(HttpStatus.ERROR);
// tableDataInfo.setMsg("当前客服暂无聊天室");
// return tableDataInfo;
// }
return getDataTable(roomList);
}
private TableDataInfo getDataTable(List<?> list)
private TableDataInfo<ChatRoomDto> getDataTable(List<ChatRoomDto> list)
{
TableDataInfo rspData = new TableDataInfo();
TableDataInfo<ChatRoomDto> rspData = new TableDataInfo<ChatRoomDto>();
rspData.setCode(HttpStatus.SUCCESS);
rspData.setRows(list);
rspData.setMsg("查询成功");
PageInfo pageInfo = new PageInfo(list);
PageInfo<ChatRoomDto> pageInfo = new PageInfo<ChatRoomDto>(list);
rspData.setTotal(pageInfo.getTotal());
rspData.setTotalPage(pageInfo.getPages());
return rspData;
@ -86,7 +93,7 @@ public class ChatRoomServiceImpl extends ServiceImpl<ChatRoomMapper, ChatRoom> i
ChatRoom build = ChatRoom.builder().userOneEmail(userEmail).userTwoEmail(sysUser.getEmail()).build();
int insert = chatRoomMapper.insert(build);
if (insert > 0){
return R.success(ChatRoomDto.builder().id(build.getId()).userEmail(build.getUserTwoEmail()).build());
return R.success(ChatRoomDto.builder().id(String.valueOf(build.getId())).userEmail(build.getUserTwoEmail()).build());
}
return R.fail("聊天室不存在,并且创建聊天室失败");
}

View File

@ -1,5 +1,6 @@
package com.m2pool.chat.service.impl;
import com.m2pool.chat.config.CustomWebSocketConfig;
import com.m2pool.chat.constant.Destination;
import com.m2pool.chat.dto.WebsocketMessageDto;
import com.m2pool.chat.entity.ChatMessage;
@ -9,6 +10,7 @@ import com.m2pool.chat.mapper.ChatMessageMapper;
import com.m2pool.chat.mapper.ChatRoomMapper;
import com.m2pool.chat.service.StompService;
import com.m2pool.chat.vo.UserMessageVo;
import com.m2pool.common.core.utils.StringUtils;
import com.m2pool.common.core.web.Result.AjaxResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
@ -42,8 +44,15 @@ public class StompServiceImpl implements StompService {
@Resource
private TransactionTemplate transactionTemplate;
@Resource
private CustomWebSocketConfig webSocketConfig;
@Override
public AjaxResult sendMessageToUser(StompPrincipal principal, UserMessageVo userMessageVo) {
if(StringUtils.isEmpty(userMessageVo.getEmail())){
userMessageVo.setEmail(webSocketConfig.getDefaultCustomerEmail());
}
userRegistry.getUsers().forEach(user -> System.out.println(user.getName()));
Boolean isCreate = principal.getIsCreate();
WebsocketMessageDto build = WebsocketMessageDto.builder()
.type(userMessageVo.getType())
@ -52,33 +61,32 @@ public class StompServiceImpl implements StompService {
.sendUserType(principal.getType())
.sendTime(new Date())
.isCreate(isCreate)
.clientReadNum(0)
.build();
if (isCreate){
principal.setIsCreate(false);
}
boolean bool = checkOnline(userMessageVo);
//获取当前聊天室对象
ChatRoom chatRoom = chatRoomMapper.selectById(userMessageVo.getRoomId());
if (chatRoom != null ){
build.setRoomId(String.valueOf(userMessageVo.getRoomId()));
if(LOGIN_USER.equals(principal.getType())){
build.setClientReadNum(chatRoom.getServiceReadNum() + 1);
}else {
build.setClientReadNum(chatRoom.getClientReadNum() + 1);
}
}else{
build.setRoomId(TOURIST.equals(userMessageVo.getType()) ? userMessageVo.getEmail() : principal.getName());
}
//当前用户发送其他人, 发送给指定用户. 否则默认发送给当前发送者
boolean bool = checkOnline(userMessageVo);
//在线用户才发送消息
if (bool){
messagingTemplate.convertAndSendToUser(userMessageVo.getEmail(), Destination.QUEUE + "/" + userMessageVo.getEmail(),build);
}
// 接收者和发送者 都不是游客才能存储消息 和修改聊天室信息后续可改为消息中间件解耦形式
if (!TOURIST.equals(userMessageVo.getReceiveUserType()) && !TOURIST.equals(principal.getType()) && chatRoom != null){
build.setRoomId(userMessageVo.getRoomId());
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
@ -118,7 +126,7 @@ public class StompServiceImpl implements StompService {
.sendEmail(principal.getName())
.content(userMessageVo.getContent())
.type(userMessageVo.getType())
.roomId(userMessageVo.getRoomId())
.roomId(Long.parseLong(userMessageVo.getRoomId()))
.build();
chatMessageMapper.insert(message);
return message.getId();
@ -142,4 +150,16 @@ public class StompServiceImpl implements StompService {
}
chatRoomMapper.updateById(room);
}
/**
* 用于客服关闭断线客服端聊天室接口
* @param roomId 游客与客服聊天室id, 实际为游客邮箱
* @return
*/
public void customerCloseRoom(String roomId){
messagingTemplate.convertAndSendToUser(
webSocketConfig.getDefaultCustomerEmail(),
Destination.QUEUE + Destination.QUEUE_CLOSE_ROOM + webSocketConfig.getDefaultCustomerEmail(),roomId);
}
}

View File

@ -1,12 +1,72 @@
package com.m2pool.chat.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.m2pool.chat.entity.ChatMessage;
import com.m2pool.chat.entity.ChatMessageHistory;
import com.m2pool.chat.mapper.ChatMessageMapper;
import com.m2pool.chat.service.ChatMessageHistoryService;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* @ClassName ChatTask
* @Description chat 聊天室和消息定时任务
* @Author yyb
* @Date 2025/4/15 11:51
*/
@Configuration
@EnableScheduling
public class ChatTask {
@Resource
private ChatMessageMapper chatMessageMapper;
@Resource
private ChatMessageHistoryService chatMessageHistoryService;
// @Scheduled(cron = "0 0/1 * * * ?")
@Scheduled(cron = "0 15 1 * * ?")
public void chatMessageDataSeparatedForHotAndCold(){
int pageNum = 1;
int pageSize = 1000; // 每批处理数量
List<ChatMessage> chatMessages;
do {
Page<ChatMessage> page = new Page<>(pageNum, pageSize);
chatMessageMapper.selectPage(page, new LambdaQueryWrapper<ChatMessage>()
.le(ChatMessage::getCreateTime, LocalDateTime.now().minusDays(7)));
chatMessages = page.getRecords();
if (!chatMessages.isEmpty()) {
List<ChatMessageHistory> collect = chatMessages.stream()
.map(chatMessage ->
ChatMessageHistory.builder()
.id(chatMessage.getId())
.type(chatMessage.getType())
.sendEmail(chatMessage.getSendEmail())
.content(chatMessage.getContent())
.roomId(chatMessage.getRoomId())
.createTime(chatMessage.getCreateTime())
.updateTime(chatMessage.getUpdateTime())
.build()
)
.collect(Collectors.toList());
// 批量插入
chatMessageHistoryService.saveBatch(collect);
chatMessageMapper.deleteBatchIds(chatMessages.stream().map(ChatMessage::getId).collect(Collectors.toList()));
}
pageNum++;
} while (!chatMessages.isEmpty());
}
}

View File

@ -0,0 +1,30 @@
package com.m2pool.chat.utils;
import java.util.ArrayList;
import java.util.Map;
/**
* @ClassName StompUtils
* @Description stomp工具类
* @Author yyb
* @Date 2025/4/29 11:35
*/
public class StompUtils {
/**
* 获取stomp自定义请求头
* @param raw
* @param headerKey
* @return
*/
public static String getConnectCustomHeaders(Object raw,String headerKey){
String headerValue = "";
if (raw instanceof Map) {
Object value = ((Map<?, ?>) raw).get(headerKey);
if(value instanceof ArrayList){
headerValue = ((ArrayList<?>) value).get(0).toString();
}
}
return headerValue;
}
}

View File

@ -13,8 +13,8 @@ import lombok.Data;
@Data
@ApiModel(description = "聊天室请求对象")
public class CharRoomVo {
@ApiModelProperty(value = "聊天室id", example = "1")
@ApiModelProperty(value = "聊天室id", example = "1",required = true)
private Long id;
@ApiModelProperty(value = "聊天室重要程度 0不重要 1重要", example = "1")
@ApiModelProperty(value = "聊天室重要程度 0不重要 1重要", example = "1",required = true)
private Integer flag;
}

View File

@ -20,10 +20,10 @@ public class MessagePageVo {
@ApiModelProperty(value = "页数量", example = "1")
private Integer pageNum;
@ApiModelProperty(value = "聊天室id", example = "1")
@ApiModelProperty(value = "聊天室id", example = "1",required = true)
private Long roomId;
@ApiModelProperty(value = "用户类型 0 游客 1 登录用户 2 客服", example = "1")
@ApiModelProperty(value = "用户类型 0 游客 1 登录用户 2 客服", example = "1",required = true)
private Integer userType;
public MessagePageVo() {
this.pageNum = 20;

View File

@ -42,5 +42,5 @@ public class UserMessageVo {
* 聊天室id
*/
@ApiModelProperty(value = "聊天室id", example = "1")
private Long roomId;
private String roomId;
}

View File

@ -11,7 +11,11 @@
type,
send_email as sendEmail,
content,
create_time as createTime
create_time as createTime,
room_id as roomId,
case when send_email = #{email} then 1
else 0
end as isSelf
FROM chat_message_history
<where>
room_id = #{roomId}

View File

@ -9,11 +9,11 @@
flag,
last_user_send_time as lastUserSendTime,
last_customer_send_time as lastCustomerSendTime,
client_read_num AS clientReadNum
service_read_num AS clientReadNum
FROM
chat_room
<where>
user_two_email = #{userEmail} AND del = FALSE
user_two_email = #{userEmail} AND del = false
<choose>
<when test="sendDateTime != null">
AND last_user_send_time <![CDATA[ <= ]]> #{sendDateTime}
@ -22,6 +22,7 @@
AND last_user_send_time <![CDATA[ <= ]]> NOW()
</otherwise>
</choose>
</where>
ORDER BY
flag DESC,
@ -29,7 +30,7 @@
</select>
<select id="findRoomByUserEmail" resultType="com.m2pool.chat.dto.ChatRoomDto">
SELECT
id,service_read_num AS clientReadNum ,user_two_email AS userEmail
id,client_read_num AS clientReadNum ,user_two_email AS userEmail
FROM
chat_room
WHERE