stomp+websocket 框架搭建

This commit is contained in:
yyb 2025-04-11 10:09:13 +08:00
parent c4f34073c4
commit eefd4c40f0
10 changed files with 425 additions and 10 deletions

View File

@ -0,0 +1,43 @@
package com.m2pool.common.core.utils;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @ClassName JsonUtil
* @Description JSON 转换工具类
* @Author yyb
* @Date 2025/4/10 16:08
*/
public class JsonUtil {
private static final ObjectMapper objectMapper = new ObjectMapper();
/**
* JSON 字符串转换为指定类型的对象
* @param jsonString JSON 字符串
* @param clazz 目标对象的类类型
* @param <T> 目标对象的类型
* @return 转换后的对象
* @throws RuntimeException 如果转换失败
*/
public static <T> T convertString2Object(String jsonString, Class<T> clazz) {
try {
return objectMapper.readValue(jsonString, clazz);
} catch (Exception e) {
throw new RuntimeException("Failed to convert JSON string to object", e);
}
}
/**
* 将对象转换为 JSON 字符串
* @param object 要转换的对象
* @return JSON 字符串
* @throws RuntimeException 如果转换失败
*/
public static String toJson(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (Exception e) {
throw new RuntimeException("Failed to convert object to JSON string", e);
}
}
}

View File

@ -0,0 +1,86 @@
package com.m2pool.chat.config;
import com.m2pool.chat.coverter.CommonMessageConvert;
import com.m2pool.chat.interceptor.WebsocketChannelInterceptor;
import com.m2pool.chat.interceptor.WebsocketHandshakeInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import java.util.List;
/**
* @Description TODO
* @Date 2025/2/25 14:43
* @Author 杜懿
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
/**
* 注册 Stomp的端点 可以注册多个端点
* addEndpoint添加STOMP协议的端点客户端访问地址
* withSockJS指定端点使用SockJS协议
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat")
.addInterceptors(new WebsocketHandshakeInterceptor())
//.setHandshakeHandler(webSocketHandshakeHandler)
//允许跨域访问
.setAllowedOrigins("*")
.withSockJS();
}
/**
* 配置消息代理
* 客户端订阅消息的请求前缀topic用于广播推送queue用于点对点推送
* 启动简单Broker消息的发送的地址符合配置的前缀来的消息才发送到这个broker
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue")
.setHeartbeatValue(new long[] {10000, 10000})
.setTaskScheduler(new DefaultManagedTaskScheduler());
config.setApplicationDestinationPrefixes("/message");
//服务端通知客户端的前缀可以不设置默认为user
config.setUserDestinationPrefix("/user");
}
/**
* 配置客户端出站通道拦截器默认线程1
* 配置核心线程池 10 最大线程20 允许线程空闲时间 60
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
// 拦截器配置
registration.interceptors(new WebsocketChannelInterceptor());
}
/**
* 配置消息转换器
* @param messageConverters 转换器集合
* @return 是否使用
*/
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
messageConverters.add(new CommonMessageConvert());
return true;
}
}

View File

@ -0,0 +1,10 @@
package com.m2pool.chat.constant;
/**
* @ClassName ExceptionCode
* @Description websocket 异常码
* @Author yyb
* @Date 2025/4/10 16:37
*/
public class WebsocketExceptionCode {
}

View File

@ -1,28 +1,29 @@
package com.m2pool.chat.controller; package com.m2pool.chat.controller;
import com.m2pool.chat.entity.ChatMessage; import com.m2pool.chat.dto.WebsocketMessageDto;
import com.m2pool.chat.entity.ChatMsg;
import com.m2pool.chat.service.ChatService; import com.m2pool.chat.service.ChatService;
import com.m2pool.chat.service.WebSocketServer;
import com.m2pool.chat.vo.ChatHistoryVo; import com.m2pool.chat.vo.ChatHistoryVo;
import com.m2pool.chat.vo.UserMessageVo;
import com.m2pool.common.core.utils.StringUtils; import com.m2pool.common.core.utils.StringUtils;
import com.m2pool.common.core.web.Result.AjaxResult; import com.m2pool.common.core.web.Result.AjaxResult;
import com.m2pool.common.core.web.controller.BaseController; import com.m2pool.common.core.web.controller.BaseController;
import com.m2pool.system.api.entity.SysUser; import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.http.ResponseEntity; import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.*; import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
/** /**
* @Description TODO * @Description TODO
* @Date 2025/2/24 15:05 * @Date 2025/2/24 15:05
* @Author 杜懿 * @Author 杜懿
*/ */
@RestController @Controller
@RequestMapping("/chat")
public class ChatController extends BaseController { public class ChatController extends BaseController {
@Resource @Resource
@ -32,6 +33,7 @@ public class ChatController extends BaseController {
// //private WebSocketServer webSocketServer; // //private WebSocketServer webSocketServer;
@GetMapping("/history") @GetMapping("/history")
@ResponseBody
public AjaxResult getChatHistory(@RequestBody ChatHistoryVo vo) { public AjaxResult getChatHistory(@RequestBody ChatHistoryVo vo) {
if(StringUtils.isNull(vo)){ if(StringUtils.isNull(vo)){
@ -42,4 +44,39 @@ public class ChatController extends BaseController {
return chatService.getHistory(identifier); return chatService.getHistory(identifier);
} }
//spring提供的推送方式
@Resource
private SimpMessagingTemplate messagingTemplate;
/**
* 发送消息到对应的用户
* @param userId 用户id消息接受者
* @param userMessageVo 消息体
* @return 返回值通过CommonMessageConvert消息转换器转换
*/
@MessageMapping("/message/{userId}")
@SendToUser("/queue/{userId}")
public WebsocketMessageDto sendMessageToUser(@DestinationVariable String userId, UserMessageVo userMessageVo) {
WebsocketMessageDto websocketMessageDto = new WebsocketMessageDto();
websocketMessageDto.setType("message");
websocketMessageDto.setMessage(userMessageVo.getMessage());
return websocketMessageDto;
}
//TODO 前端打开聊天框获取用户信息建立一对一链接
//TODO 用户登录后保存历史信息到数据库表,分表存储(7天)
//TODO 用户注销需删除历史信息
//
} }

View File

@ -0,0 +1,53 @@
package com.m2pool.chat.coverter;
import com.m2pool.chat.exception.WebSocketException;
import com.m2pool.common.core.utils.JsonUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.GenericMessage;
import java.nio.charset.StandardCharsets;
/**
* @ClassName CommonMessageConvert
* @Description 消息对象转换器
* @Author yyb
* @Date 2025/4/10 16:01
*/
public class CommonMessageConvert implements MessageConverter {
/**
* 将客户端发送过来的消息转换为指定的对象
* @param message 客户端发送过来的消息
* @param targetClass 目标数据类型
* @return 转换后的对象
*/
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
if (message.getPayload() instanceof byte[]) {
try {
String textPayload = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
return JsonUtil.convertString2Object(textPayload,targetClass);
} catch (Exception e) {
throw new WebSocketException( "消息格式错误");
}
}
return null;
}
/**
* 将消息转为Message
* @param payload 需要转换的消息
* @param headers 消息头信息
* @return broker的消息实体
*/
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
String str = JsonUtil.toJson(payload);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
return new GenericMessage<>(bytes, headers);
}
}

View File

@ -0,0 +1,15 @@
package com.m2pool.chat.dto;
import lombok.Data;
/**
* @ClassName WebsocketMessageDto
* @Description 消息返回对象
* @Author yyb
* @Date 2025/4/10 16:27
*/
@Data
public class WebsocketMessageDto {
private String type;
private String message;
}

View File

@ -0,0 +1,34 @@
package com.m2pool.chat.exception;
import com.m2pool.common.core.exception.base.BaseException;
public class WebSocketException extends BaseException {
private static String websocket_module = "chat-websocket";
public WebSocketException(String module, String code, Object[] args, String defaultMessage) {
super(module, code, args, defaultMessage);
module = websocket_module;
}
public WebSocketException(String module, String code, Object[] args) {
super(module, code, args);
module = websocket_module;
}
public WebSocketException(String module, String defaultMessage) {
super(module, defaultMessage);
module = websocket_module;
}
public WebSocketException(String code, Object[] args) {
super(code, args);
}
public WebSocketException(String defaultMessage) {
super(defaultMessage);
}
}

View File

@ -0,0 +1,80 @@
package com.m2pool.chat.interceptor;
import org.springframework.lang.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
/**
* @ClassName WebsocketChannelInterceptor
* @Description websocket channel 通道拦截器
* @Author yyb
* @Date 2025/4/10 15:44
*/
public class WebsocketChannelInterceptor implements ChannelInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketChannelInterceptor.class);
/**
* websocket channel 通道拦截器
* @param message
* @param channel
* @return
*/
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
//TODO 前端通过@stomp/stompjs基于stomp-client+webscoket等插件实现可以在该方法获取自定义请求头做一些校验
if (accessor.getCommand() == StompCommand.CONNECT) {
LOGGER.info("------------收到websocket的连接消息");
}
if (accessor.getCommand() == StompCommand.SEND) {
LOGGER.info("------------收到websocket的数据发送消息");
}
if (accessor.getCommand() == StompCommand.SUBSCRIBE) {
LOGGER.info("------------收到websocket的订阅消息");
}
if (accessor.getCommand() == StompCommand.UNSUBSCRIBE) {
LOGGER.info("------------收到websocket的取消订阅消息");
}
return message;
}
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
LOGGER.info("------------WebsocketChannelInterceptor-postSend");
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {
LOGGER.info("-----------WebsocketChannelInterceptor-afterSendCompletion");
}
@Override
public boolean preReceive(MessageChannel channel) {
LOGGER.info("----------WebsocketChannelInterceptor-preReceive");
return true;
}
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
LOGGER.info("----------WebsocketChannelInterceptor-postReceive");
return message;
}
@Override
public void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) {
LOGGER.info("----------WebsocketChannelInterceptor-afterReceiveCompletion");
}
}

View File

@ -0,0 +1,34 @@
package com.m2pool.chat.interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* @ClassName WebsocketHandshakeInterceptor
* @Description websocket 握手处理器类
* @Author yyb
* @Date 2025/4/10 15:39
*/
public class WebsocketHandshakeInterceptor implements HandshakeInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketHandshakeInterceptor.class);
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
LOGGER.info("------------------WebsocketHandshakeInterceptor:beforeHandshake");
//TODO 前端如果是webscoket原生实现 获取一些自定义请求头,做一些校验
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
LOGGER.info("-----------------WebsocketHandshakeInterceptor:afterHandshake");
}
}

View File

@ -0,0 +1,23 @@
package com.m2pool.chat.vo;
import lombok.Data;
/**
* @ClassName UserMessageVo
* @Description 用户发送消息对象
* @Author yyb
* @Date 2025/4/10 16:28
*/
@Data
public class UserMessageVo {
/**
* 消息类型
*/
private String type;
/**
* 消息体
*/
private String message;
}