diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java index c1cf3b0..6db9177 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/config/WebSocketBrokerConfig.java @@ -5,6 +5,7 @@ import com.m2pool.chat.coverter.CommonMessageConvert; import com.m2pool.chat.interceptor.WebsocketChannelInterceptor; import com.m2pool.chat.interceptor.WebsocketHandshakeInterceptor; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.simp.config.ChannelRegistration; @@ -14,6 +15,8 @@ import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBr import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; +import org.springframework.web.socket.messaging.StompSubProtocolHandler; +import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; import javax.annotation.Resource; import java.util.List; @@ -125,4 +128,5 @@ public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { return true; } + } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java index 77a2cf9..1d56cfa 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/constant/ExceptionEnum.java @@ -10,7 +10,7 @@ import lombok.Getter; */ @Getter public enum ExceptionEnum { - IP_LIMIT_CONNECT(1020, "本机连接数已达上限,请关闭已有链接"), + IP_LIMIT_CONNECT(1020, "本机连接数已达上限,请先关闭已有链接"), MAX_LIMIT_CONNECT(1021, "服务器websocket连接数已达上限,服务器拒绝链接"), SET_PRINCIPAL_FAIL(1022, "websocket链接异常,用户身份设置失败"), GET_PRINCIPAL_FAIL(1023, "websocket链接异常,用户信息邮箱获取失败"), diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java index cd51965..62321da 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/interceptor/WebsocketChannelInterceptor.java @@ -4,6 +4,8 @@ package com.m2pool.chat.interceptor; import com.m2pool.chat.config.CustomWebSocketConfig; import com.m2pool.chat.constant.ExceptionEnum; import com.m2pool.chat.entity.StompPrincipal; +import com.m2pool.chat.listener.IpLimitEvent; +import io.lettuce.core.ScriptOutputType; import lombok.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +41,9 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketChannelInterceptor.class); /** - * 当前加入链接的ip ,key 为 ip ,VALUE 为用户邮箱 + * 当前加入链接的ip ,key 为 ip ,VALUE 邮箱 */ - private static final ConcurrentHashMap ipConnectionCountMap = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap ipConnectionCountMap = new ConcurrentHashMap<>(); /** * 当前链接数 @@ -121,26 +123,30 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { Map sessionAttributes = accessor.getSessionAttributes(); if (sessionAttributes != null) { String ipAddr = (String) sessionAttributes.get(IPADDR); - String hasConnectionEmail = ipConnectionCountMap.get(ipAddr); + EmailLimit emailAndCount = ipConnectionCountMap.get(ipAddr); + //两种ip限制情况 //本次链接为游客 且ip上已经有人链接直接拒绝本次链接 - if(type == TOURIST && hasConnectionEmail != null){ + if(type == TOURIST && emailAndCount != null){ throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); } - //本次链接为登录用户,并且已经链接.直接拒绝本次链接(多用户登录) - if ( type == LOGIN_USER ) { - if(email.equals(hasConnectionEmail) ){ -// StompPrincipal principal = new StompPrincipal(email, type, true); -// applicationEventPublisher.publishEvent(new IpLimitEvent(this, principal)); + + //本次链接为登录用户,并且已经链接.直接拒绝本次链接 + if ( type != TOURIST && emailAndCount != null) { + emailAndCount.setCount(emailAndCount.getCount() + 1); + if(email.equals(emailAndCount.getEmail())){ throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); } - if(ipConnectionCountMap.containsValue(email)){ - throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.ACCOUNT_HAS_CONNECTED)); - } - + //if(ipConnectionCountMap.containsValue(emailAndCount)){ + // ipConnectionCountMap.put(ipAddr, emailAndCount); + // throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.ACCOUNT_HAS_CONNECTED)); + //} + }else{ + EmailLimit emailLimit = new EmailLimit(); + emailLimit.setEmail(email); + emailLimit.setCount(1); + ipConnectionCountMap.put(ipAddr,emailLimit ); } - ipConnectionCountMap.put(ipAddr,email); - }else{ throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL)); } @@ -156,9 +162,15 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { connectionCount.decrementAndGet(); //断开链接的ip Map sessionAttributes = accessor.getSessionAttributes(); - if (sessionAttributes != null) { + if (sessionAttributes != null ) { String ipAddr = (String) sessionAttributes.get(IPADDR); - ipConnectionCountMap.remove(ipAddr); + EmailLimit emailAndCount = ipConnectionCountMap.get(ipAddr); + if (emailAndCount.getCount() > 1){ + emailAndCount.setCount(emailAndCount.getCount() - 1); + }else{ + ipConnectionCountMap.remove(ipAddr); + } + }else{ throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.GET_PRINCIPAL_FAIL)); } @@ -186,4 +198,13 @@ public class WebsocketChannelInterceptor implements ChannelInterceptor { @Override public void afterReceiveCompletion(@Nullable Message message, MessageChannel channel, @Nullable Exception ex) { } + + /** + * ip限制存储对象 + */ + @Data + private class EmailLimit{ + private String email; + private int count = 1; + } } diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java index 3087d3f..0019fde 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/listener/IpLimitListener.java @@ -1,8 +1,10 @@ package com.m2pool.chat.listener; +import com.m2pool.chat.constant.ExceptionEnum; import com.m2pool.chat.entity.StompPrincipal; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; +import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; @@ -18,5 +20,9 @@ public class IpLimitListener implements ApplicationListener { // messagingTemplate.convertAndSendToUser(principal.getName(), // Destination.QUEUE + "/" + principal.getName() // , "ip链接数限制"); + + throw new MessageDeliveryException(ExceptionEnum.fromCode(ExceptionEnum.IP_LIMIT_CONNECT)); + + } -} \ No newline at end of file +} diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java index 3d2e575..a8d39f6 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/service/impl/StompServiceImpl.java @@ -64,6 +64,7 @@ public class StompServiceImpl implements StompService { ChatRoom chatRoom = chatRoomMapper.selectOne(new LambdaQueryWrapper() .eq(ChatRoom::getUserOneEmail, principal.getName()) .eq(ChatRoom::getUserTwoEmail, userMessageVo.getEmail())); + System.out.println("发送消息,聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail()); build.setRoomId(String.valueOf(userMessageVo.getRoomId())); int serviceReadNum = chatRoom != null ? chatRoom.getClientReadNum() : 0; build.setClientReadNum( serviceReadNum+ 1); @@ -90,6 +91,7 @@ public class StompServiceImpl implements StompService { .eq(ChatRoom::getUserOneEmail, userMessageVo.getEmail()) .eq(ChatRoom::getUserTwoEmail, principal.getName())); build.setRoomId(String.valueOf(userMessageVo.getRoomId())); + System.out.println("发送消息,聊天室id"+userMessageVo.getRoomId()+"发送者邮箱"+principal.getName()+"接受者邮箱"+userMessageVo.getEmail()); int serviceReadNum = chatRoom != null ? chatRoom.getServiceReadNum() : 0; build.setClientReadNum(serviceReadNum + 1); diff --git a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java index ae87212..50009f4 100644 --- a/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java +++ b/m2pool-modules/m2pool-chat/src/main/java/com/m2pool/chat/task/ChatTask.java @@ -1,11 +1,15 @@ package com.m2pool.chat.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.m2pool.chat.entity.ChatMessage; import com.m2pool.chat.entity.ChatMessageHistory; +import com.m2pool.chat.entity.ChatRoom; import com.m2pool.chat.mapper.ChatMessageMapper; +import com.m2pool.chat.mapper.ChatRoomMapper; import com.m2pool.chat.service.ChatMessageHistoryService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; @@ -30,6 +34,8 @@ public class ChatTask { @Resource private ChatMessageHistoryService chatMessageHistoryService; + @Autowired + private ChatRoomMapper chatRoomMapper; // @Scheduled(cron = "0 0/1 * * * ?") @Scheduled(cron = "0 15 1 * * ?") @@ -68,4 +74,16 @@ public class ChatTask { pageNum++; } while (!chatMessages.isEmpty()); } + + /** + * 清理掉,因服务器异常,未发送到客服这边删除的游客聊天室和消息表数据 + * + */ + @Scheduled(cron = "0 16 1 * * ?") + //@Scheduled(cron = "0 0/1 * * * ?") + public void clearTouristDatas(){ + chatMessageMapper.delete(new LambdaUpdateWrapper() + .like(ChatMessage::getSendEmail, "guest_")); + chatRoomMapper.delete(new LambdaUpdateWrapper().like(ChatRoom::getUserOneEmail, "guest_")); + } } diff --git a/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/utils/SocketDemo.java b/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/utils/SocketDemo.java index df5a3d3..afbb654 100644 --- a/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/utils/SocketDemo.java +++ b/m2pool-modules/m2pool-pool/src/main/java/com/m2pool/pool/utils/SocketDemo.java @@ -1,5 +1,10 @@ package com.m2pool.pool.utils; +import cn.hutool.http.HttpUtil; +import com.alibaba.fastjson.JSONObject; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; + import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; @@ -12,13 +17,23 @@ import java.net.UnknownHostException; public class SocketDemo { public static void main(String[] args) { - try { - //Socket socket = new Socket("10.168.2.167",21233); - Socket socket = new Socket("10.168.2.167",12973); - }catch (UnknownHostException e){ - System.out.println("无法找到主机: "+e.getMessage()); - } catch (IOException e) { - System.out.println("连接失败: " + e.getMessage()); - } + //try { + // //Socket socket = new Socket("10.168.2.167",21233); + // Socket socket = new Socket("10.168.2.167",12973); + //}catch (UnknownHostException e){ + // System.out.println("无法找到主机: "+e.getMessage()); + //} catch (IOException e) { + // System.out.println("连接失败: " + e.getMessage()); + //} + String url = "http://10.168.2.167:12973/blockflow/chain-info?fromGroup=0&toGroup=0"; + HttpHeaders headers = new HttpHeaders(); + headers.add("Content-Type","application/json"); + headers.add("User-Agent", "Mozilla/5.0"); + headers.add("apikey","0x09e220e226f2feb7a971a2b6f958e7d4b1c187c8"); + HttpEntity httpEntity = new HttpEntity(null, headers); + String s = HttpUtil.get(url); + + JSONObject jsonObject = JSONObject.parseObject(s); + int currentHeight = jsonObject.getIntValue("currentHeight"); } }