diff --git a/m2pool-common/common-core/src/main/java/com/m2pool/common/core/utils/JsonUtil.java b/m2pool-common/common-core/src/main/java/com/m2pool/common/core/utils/JsonUtil.java new file mode 100644 index 0000000..f2a96bf --- /dev/null +++ b/m2pool-common/common-core/src/main/java/com/m2pool/common/core/utils/JsonUtil.java @@ -0,0 +1,43 @@ +package com.m2pool.common.core.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @ClassName JsonUtil + * @Description JSON 转换工具类 + * @Author yyb + * @Date 2025/4/10 16:08 + */ +public class JsonUtil { + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * 将 JSON 字符串转换为指定类型的对象 + * @param jsonString JSON 字符串 + * @param clazz 目标对象的类类型 + * @param 目标对象的类型 + * @return 转换后的对象 + * @throws RuntimeException 如果转换失败 + */ + public static T convertString2Object(String jsonString, Class clazz) { + try { + return objectMapper.readValue(jsonString, clazz); + } catch (Exception e) { + throw new RuntimeException("Failed to convert JSON string to object", e); + } + } + + /** + * 将对象转换为 JSON 字符串 + * @param object 要转换的对象 + * @return JSON 字符串 + * @throws RuntimeException 如果转换失败 + */ + public static String toJson(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + throw new RuntimeException("Failed to convert object to JSON string", e); + } + } +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java new file mode 100644 index 0000000..c7be399 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java @@ -0,0 +1,86 @@ +package com.m2pool.chat.config; + +import com.m2pool.chat.coverter.CommonMessageConvert; +import com.m2pool.chat.interceptor.WebsocketChannelInterceptor; +import com.m2pool.chat.interceptor.WebsocketHandshakeInterceptor; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.simp.config.ChannelRegistration; +import org.springframework.messaging.simp.config.MessageBrokerRegistry; +import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler; +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; +import org.springframework.web.socket.config.annotation.StompEndpointRegistry; +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; + +import java.util.List; + +/** + * @Description TODO + * @Date 2025/2/25 14:43 + * @Author 杜懿 + */ +@Configuration +@EnableWebSocketMessageBroker +public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { + + /** + * 注册 Stomp的端点 可以注册多个端点 + * addEndpoint:添加STOMP协议的端点。客户端访问地址 + * withSockJS:指定端点使用SockJS协议 + */ + @Override + public void registerStompEndpoints(StompEndpointRegistry registry) { + registry.addEndpoint("/chat") + .addInterceptors(new WebsocketHandshakeInterceptor()) + //.setHandshakeHandler(webSocketHandshakeHandler) + //允许跨域访问 + .setAllowedOrigins("*") + .withSockJS(); + } + + /** + * 配置消息代理 + * 客户端订阅消息的请求前缀,topic用于广播推送,queue用于点对点推送 + * 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker + */ + @Override + public void configureMessageBroker(MessageBrokerRegistry config) { + + config.enableSimpleBroker("/topic", "/queue") + .setHeartbeatValue(new long[] {10000, 10000}) + .setTaskScheduler(new DefaultManagedTaskScheduler()); + + config.setApplicationDestinationPrefixes("/message"); + //服务端通知客户端的前缀,可以不设置,默认为user + config.setUserDestinationPrefix("/user"); + } + + + + + /** + * 配置客户端出站通道拦截器(默认线程1) + * 配置核心线程池 10 最大线程20 允许线程空闲时间 60 + * @param registration + */ + @Override + public void configureClientInboundChannel(ChannelRegistration registration) { + registration.taskExecutor().corePoolSize(10) + .maxPoolSize(20) + .keepAliveSeconds(60); + // 拦截器配置 + registration.interceptors(new WebsocketChannelInterceptor()); + } + + /** + * 配置消息转换器 + * @param messageConverters 转换器集合 + * @return 是否使用 + */ + @Override + public boolean configureMessageConverters(List messageConverters) { + messageConverters.add(new CommonMessageConvert()); + return true; + } + +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/WebsocketExceptionCode.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/WebsocketExceptionCode.java new file mode 100644 index 0000000..b009200 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/WebsocketExceptionCode.java @@ -0,0 +1,10 @@ +package com.m2pool.chat.constant; + +/** + * @ClassName ExceptionCode + * @Description websocket 异常码 + * @Author yyb + * @Date 2025/4/10 16:37 + */ +public class WebsocketExceptionCode { +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatController.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatController.java index 5d031cb..e379306 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatController.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/controller/ChatController.java @@ -1,28 +1,29 @@ package com.m2pool.chat.controller; -import com.m2pool.chat.entity.ChatMessage; -import com.m2pool.chat.entity.ChatMsg; +import com.m2pool.chat.dto.WebsocketMessageDto; import com.m2pool.chat.service.ChatService; -import com.m2pool.chat.service.WebSocketServer; import com.m2pool.chat.vo.ChatHistoryVo; +import com.m2pool.chat.vo.UserMessageVo; import com.m2pool.common.core.utils.StringUtils; import com.m2pool.common.core.web.Result.AjaxResult; import com.m2pool.common.core.web.controller.BaseController; -import com.m2pool.system.api.entity.SysUser; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; +import org.springframework.messaging.handler.annotation.DestinationVariable; +import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.messaging.simp.annotation.SendToUser; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; -import java.io.IOException; -import java.util.List; /** * @Description TODO * @Date 2025/2/24 15:05 * @Author 杜懿 */ -@RestController -@RequestMapping("/chat") +@Controller public class ChatController extends BaseController { @Resource @@ -32,6 +33,7 @@ public class ChatController extends BaseController { // //private WebSocketServer webSocketServer; @GetMapping("/history") + @ResponseBody public AjaxResult getChatHistory(@RequestBody ChatHistoryVo vo) { if(StringUtils.isNull(vo)){ @@ -42,4 +44,39 @@ public class ChatController extends BaseController { return chatService.getHistory(identifier); } + //spring提供的推送方式 + @Resource + private SimpMessagingTemplate messagingTemplate; + + + /** + * 发送消息到对应的用户 + * @param userId 用户id,消息接受者 + * @param userMessageVo 消息体 + * @return 返回值通过CommonMessageConvert消息转换器转换 + */ + @MessageMapping("/message/{userId}") + @SendToUser("/queue/{userId}") + public WebsocketMessageDto sendMessageToUser(@DestinationVariable String userId, UserMessageVo userMessageVo) { + WebsocketMessageDto websocketMessageDto = new WebsocketMessageDto(); + websocketMessageDto.setType("message"); + websocketMessageDto.setMessage(userMessageVo.getMessage()); + + return websocketMessageDto; + } + + + //TODO 前端打开聊天框,获取用户信息,建立一对一链接 + + + //TODO 用户登录后,保存历史信息到数据库表,分表存储(7天)。 + + + //TODO 用户注销,需删除历史信息 + + + // + + + } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/coverter/CommonMessageConvert.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/coverter/CommonMessageConvert.java new file mode 100644 index 0000000..4e7ae31 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/coverter/CommonMessageConvert.java @@ -0,0 +1,53 @@ +package com.m2pool.chat.coverter; + +import com.m2pool.chat.exception.WebSocketException; +import com.m2pool.common.core.utils.JsonUtil; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.GenericMessage; + +import java.nio.charset.StandardCharsets; + +/** + * @ClassName CommonMessageConvert + * @Description 消息对象转换器 + * @Author yyb + * @Date 2025/4/10 16:01 + */ +public class CommonMessageConvert implements MessageConverter { + /** + * 将客户端发送过来的消息转换为指定的对象 + * @param message 客户端发送过来的消息 + * @param targetClass 目标数据类型 + * @return 转换后的对象 + */ + @Override + public Object fromMessage(Message message, Class targetClass) { + if (message.getPayload() instanceof byte[]) { + try { + String textPayload = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8); + return JsonUtil.convertString2Object(textPayload,targetClass); + } catch (Exception e) { + throw new WebSocketException( "消息格式错误"); + } + } + return null; + } + + + + /** + * 将消息转为Message + * @param payload 需要转换的消息 + * @param headers 消息头信息 + * @return broker的消息实体 + */ + @Override + public Message toMessage(Object payload, MessageHeaders headers) { + String str = JsonUtil.toJson(payload); + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + return new GenericMessage<>(bytes, headers); + } + +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java new file mode 100644 index 0000000..e9dfa6d --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/dto/WebsocketMessageDto.java @@ -0,0 +1,15 @@ +package com.m2pool.chat.dto; + +import lombok.Data; + +/** + * @ClassName WebsocketMessageDto + * @Description 消息返回对象 + * @Author yyb + * @Date 2025/4/10 16:27 + */ +@Data +public class WebsocketMessageDto { + private String type; + private String message; +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/exception/WebSocketException.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/exception/WebSocketException.java new file mode 100644 index 0000000..7690f4a --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/exception/WebSocketException.java @@ -0,0 +1,34 @@ +package com.m2pool.chat.exception; + +import com.m2pool.common.core.exception.base.BaseException; + +public class WebSocketException extends BaseException { + + private static String websocket_module = "chat-websocket"; + + + + public WebSocketException(String module, String code, Object[] args, String defaultMessage) { + super(module, code, args, defaultMessage); + module = websocket_module; + } + + public WebSocketException(String module, String code, Object[] args) { + super(module, code, args); + module = websocket_module; + } + + public WebSocketException(String module, String defaultMessage) { + super(module, defaultMessage); + module = websocket_module; + } + + public WebSocketException(String code, Object[] args) { + super(code, args); + } + + public WebSocketException(String defaultMessage) { + super(defaultMessage); + } + +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java new file mode 100644 index 0000000..2929583 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java @@ -0,0 +1,80 @@ +package com.m2pool.chat.interceptor; + + +import org.springframework.lang.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaderAccessor; +import org.springframework.messaging.support.ChannelInterceptor; + +/** + * @ClassName WebsocketChannelInterceptor + * @Description websocket channel 通道拦截器 + * @Author yyb + * @Date 2025/4/10 15:44 + */ +public class WebsocketChannelInterceptor implements ChannelInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketChannelInterceptor.class); + + /** + * websocket channel 通道拦截器 + * @param message + * @param channel + * @return + */ + @Override + public Message preSend(Message message, MessageChannel channel) { + StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); + + //TODO 前端通过@stomp/stompjs、基于stomp-client+webscoket等插件实现,可以在该方法获取自定义请求头,做一些校验 + + if (accessor.getCommand() == StompCommand.CONNECT) { + LOGGER.info("------------收到websocket的连接消息"); + } + + if (accessor.getCommand() == StompCommand.SEND) { + LOGGER.info("------------收到websocket的数据发送消息"); + } + + if (accessor.getCommand() == StompCommand.SUBSCRIBE) { + LOGGER.info("------------收到websocket的订阅消息"); + } + + if (accessor.getCommand() == StompCommand.UNSUBSCRIBE) { + LOGGER.info("------------收到websocket的取消订阅消息"); + } + + return message; + } + + + @Override + public void postSend(Message message, MessageChannel channel, boolean sent) { + LOGGER.info("------------WebsocketChannelInterceptor-postSend"); + } + + @Override + public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, @Nullable Exception ex) { + LOGGER.info("-----------WebsocketChannelInterceptor-afterSendCompletion"); + } + + @Override + public boolean preReceive(MessageChannel channel) { + LOGGER.info("----------WebsocketChannelInterceptor-preReceive"); + return true; + } + + @Override + public Message postReceive(Message message, MessageChannel channel) { + LOGGER.info("----------WebsocketChannelInterceptor-postReceive"); + return message; + } + + @Override + public void afterReceiveCompletion(@Nullable Message message, MessageChannel channel, @Nullable Exception ex) { + LOGGER.info("----------WebsocketChannelInterceptor-afterReceiveCompletion"); + } +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketHandshakeInterceptor.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketHandshakeInterceptor.java new file mode 100644 index 0000000..bdc0c62 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketHandshakeInterceptor.java @@ -0,0 +1,34 @@ +package com.m2pool.chat.interceptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; + + +/** + * @ClassName WebsocketHandshakeInterceptor + * @Description websocket 握手处理器类 + * @Author yyb + * @Date 2025/4/10 15:39 + */ +public class WebsocketHandshakeInterceptor implements HandshakeInterceptor { + + private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketHandshakeInterceptor.class); + + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { + LOGGER.info("------------------WebsocketHandshakeInterceptor:beforeHandshake"); + //TODO 前端如果是webscoket原生实现, 获取一些自定义请求头,做一些校验 + return true; + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { + LOGGER.info("-----------------WebsocketHandshakeInterceptor:afterHandshake"); + } +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java new file mode 100644 index 0000000..139e2b8 --- /dev/null +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/vo/UserMessageVo.java @@ -0,0 +1,23 @@ +package com.m2pool.chat.vo; + +import lombok.Data; + +/** + * @ClassName UserMessageVo + * @Description 用户发送消息对象 + * @Author yyb + * @Date 2025/4/10 16:28 + */ +@Data +public class UserMessageVo { + + /** + * 消息类型 + */ + private String type; + + /** + * 消息体 + */ + private String message; +}