pom.xml | ●●●●● patch | view | raw | blame | history | |
src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java | ●●●●● patch | view | raw | blame | history |
pom.xml
@@ -29,6 +29,12 @@ <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- <dependency>--> <!-- <groupId>com.madgag</groupId>--> <!-- <artifactId>animated-gif-lib</artifactId>--> src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
@@ -7,6 +7,7 @@ import cc.mrbird.febs.mall.vo.MallGoodsCommentVo; import cc.mrbird.febs.mall.vo.MallGoodsDetailsVo; import cc.mrbird.febs.mall.vo.MallGoodsListVo; import cc.mrbird.febs.websocket.WsSessionManager; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -57,4 +58,17 @@ return new FebsResponse().success().data(mallGoodsService.findMallGoodsCommentByGoodsId(queryDto)); } /** * TODO 测试 */ @ApiOperation(value = "模拟服务器给链接的websocket发送消息", notes = "模拟服务器给链接的websocket发送消息") @GetMapping(value = "/token//{inviteId}/{text}") public FebsResponse token(@PathVariable String inviteId, @PathVariable String text) { WsSessionManager.sendMsgToOne(inviteId,text); return new FebsResponse().success(); } } src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java
New file @@ -0,0 +1,54 @@ package cc.mrbird.febs.websocket; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author xxx * @date 2020-09-01 **/ @Slf4j @Component public class HttpAuthHandler extends TextWebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { Map<String, Object> attributes = session.getAttributes(); String inviteId = (String) attributes.get("inviteId"); WsSessionManager.add(inviteId, session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { log.info("收到消息--{}", message.getPayload()); session.sendMessage(message); } @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { log.info("心跳"); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.info("错误 : {}", exception.toString()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { log.info("连接中断 : {}", status.toString()); String phone = (String) session.getAttributes().get("inviteId"); WsSessionManager.remove(phone); } } src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java
New file @@ -0,0 +1,56 @@ package cc.mrbird.febs.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.server.standard.ServerEndpointExporter; import javax.annotation.Resource; /** * * @author xxx * @date 2020-09-01 **/ @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Resource private HttpAuthHandler httpAuthHandler; @Resource private WsAuthInterceptor wsAuthInterceptor; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(httpAuthHandler, "wsxg") .addInterceptors(wsAuthInterceptor) .setAllowedOrigins("*"); } @Bean public TaskScheduler taskScheduler() { /* 不显式配置TaskScheduler,Spring会使用默认的TaskScheduler实现类——ScheduledThreadPoolExecutor。 这个实现类使用一个线程池来执行任务,线程池的大小默认为1。 也就是说,如果不配置TaskScheduler,所有的定时任务都会在同一个线程中执行,可能会导致任务执行时间过长或者任务之间相互影响。 因此,为了更好地控制定时任务的执行,建议显式配置TaskScheduler,并根据具体需求设置线程池大小等属性。 这样可以确保定时任务的精确调度和高效执行。 */ ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); return taskScheduler; } /** * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java
New file @@ -0,0 +1,86 @@ package cc.mrbird.febs.websocket; import cc.mrbird.febs.common.utils.AppContants; import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.asymmetric.KeyType; import cn.hutool.crypto.asymmetric.RSA; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.HashMap; import java.util.Map; /** * * @author XXX * @date 2020-09-01 **/ @Slf4j @Component public class WsAuthInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception { // log.info("拦截器,握手前"); Map<String, String> params = parseParameterMap(request.getURI().getQuery()); String token = params.get("token"); if (StrUtil.isNotBlank(token)) { String inviteId = token; // String inviteId = resolveToken(token); log.info("----->{}", inviteId); if (StrUtil.isBlank(inviteId) || AppContants.TIME_OUT.equals(inviteId)) { return false; } map.put("inviteId", inviteId); return true; } return false; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Exception e) { // log.info("握手后"); } private Map<String, String> parseParameterMap(String queryString) { Map<String, String> parameterMap = new HashMap<>(); String[] parameters = queryString.split("&"); for (String parameter : parameters) { String[] paramPair = parameter.split("="); if (paramPair.length == 2) { parameterMap.put(paramPair[0], paramPair[1]); } } return parameterMap; } private String resolveToken(String token) { try { RSA rsa = new RSA(AppContants.PRIVATE_KEY, null); String[] tokens = StrUtil.split(rsa.decryptStr(token, KeyType.PrivateKey), "_"); // log.info("websocket token : {}, timestemp : {}", tokens[0], tokens[1]); if (verifyTokenExpired(Long.parseLong(tokens[1]))) { return tokens[0]; } else { return AppContants.TIME_OUT; } } catch (Exception e) { log.error("#解析token异常#", e); return null; } } private Boolean verifyTokenExpired(Long time) { boolean isDebug = false; if (!isDebug) { long currentTime = System.currentTimeMillis(); return currentTime - time <= 10000; } return true; } } src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java
New file @@ -0,0 +1,84 @@ package cc.mrbird.febs.websocket; import cn.hutool.core.util.StrUtil; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @date 2020-09-01 **/ public class WsSessionManager { private static final ConcurrentHashMap<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>(); public static void add(String key, WebSocketSession session) { SESSIONS.put(key, session); } public static WebSocketSession remove(String key) { return SESSIONS.remove(key); } public static void removeAndClose(String key) { WebSocketSession session = remove(key); if (session != null) { try { // 关闭连接 session.close(); } catch (IOException e) { // todo: 关闭出现异常处理 e.printStackTrace(); } } } public static WebSocketSession get(String key) { // 获得 session return SESSIONS.get(key); } /** * 发送消息 * * @param key 用户手机号 * @param msg 消息 */ public static void sendMsgToOne(String key, String msg) { TextMessage textMessage = new TextMessage(msg); try { if (SESSIONS.containsKey(key)) { SESSIONS.get(key).sendMessage(textMessage); } } catch (IOException e) { e.printStackTrace(); } } /** * 批量发送 * * @param keys 手机号集合, 逗号隔开 * @param msg 消息 */ public static void sendMsgToMany(String keys, String msg) { TextMessage textMessage = new TextMessage(msg); List<String> keyList = StrUtil.splitTrim(keys, ","); for (Map.Entry<String, WebSocketSession> entry : SESSIONS.entrySet()) { if (keyList.contains(entry.getKey())) { try { entry.getValue().sendMessage(textMessage); } catch (IOException e) { e.printStackTrace(); } } } } }