From eaf453b84d916acb702b163ebcb462850daeb591 Mon Sep 17 00:00:00 2001 From: KKSU <15274802129@163.com> Date: Wed, 19 Jun 2024 15:08:40 +0800 Subject: [PATCH] websocket推送消息 --- src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java | 54 ++++++++++ src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java | 56 +++++++++++ src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java | 84 ++++++++++++++++ src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java | 14 ++ src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java | 86 +++++++++++++++++ pom.xml | 6 + 6 files changed, 300 insertions(+), 0 deletions(-) diff --git a/pom.xml b/pom.xml index 1002b06..bf9ce14 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,12 @@ <dependencies> + + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-websocket</artifactId> + </dependency> + <!-- <dependency>--> <!-- <groupId>com.madgag</groupId>--> <!-- <artifactId>animated-gif-lib</artifactId>--> diff --git a/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java b/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java index d6bc5c6..78b1f60 100644 --- a/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java +++ b/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java @@ -7,6 +7,7 @@ import cc.mrbird.febs.mall.vo.MallGoodsCommentVo; import cc.mrbird.febs.mall.vo.MallGoodsDetailsVo; import cc.mrbird.febs.mall.vo.MallGoodsListVo; +import cc.mrbird.febs.websocket.WsSessionManager; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; @@ -57,4 +58,17 @@ return new FebsResponse().success().data(mallGoodsService.findMallGoodsCommentByGoodsId(queryDto)); } + + + /** + * TODO 测试 + */ + + @ApiOperation(value = "模拟服务器给链接的websocket发送消息", notes = "模拟服务器给链接的websocket发送消息") + @GetMapping(value = "/token//{inviteId}/{text}") + public FebsResponse token(@PathVariable String inviteId, @PathVariable String text) { + WsSessionManager.sendMsgToOne(inviteId,text); + return new FebsResponse().success(); + } + } diff --git a/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java b/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java new file mode 100644 index 0000000..f4b6696 --- /dev/null +++ b/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java @@ -0,0 +1,54 @@ +package cc.mrbird.febs.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.PongMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author xxx + * @date 2020-09-01 + **/ +@Slf4j +@Component +public class HttpAuthHandler extends TextWebSocketHandler { + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + Map<String, Object> attributes = session.getAttributes(); + String inviteId = (String) attributes.get("inviteId"); + + WsSessionManager.add(inviteId, session); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + log.info("收到消息--{}", message.getPayload()); + session.sendMessage(message); + } + + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { + log.info("心跳"); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + log.info("错误 : {}", exception.toString()); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + log.info("连接中断 : {}", status.toString()); + String phone = (String) session.getAttributes().get("inviteId"); + WsSessionManager.remove(phone); + } +} diff --git a/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java b/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java new file mode 100644 index 0000000..8d7f6ee --- /dev/null +++ b/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java @@ -0,0 +1,56 @@ +package cc.mrbird.febs.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +import javax.annotation.Resource; + +/** + * + * @author xxx + * @date 2020-09-01 + **/ +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + @Resource + private HttpAuthHandler httpAuthHandler; + @Resource + private WsAuthInterceptor wsAuthInterceptor; + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(httpAuthHandler, "wsxg") + .addInterceptors(wsAuthInterceptor) + .setAllowedOrigins("*"); + } + + @Bean + public TaskScheduler taskScheduler() { + /* + 不显式配置TaskScheduler,Spring会使用默认的TaskScheduler实现类——ScheduledThreadPoolExecutor。 + 这个实现类使用一个线程池来执行任务,线程池的大小默认为1。 + 也就是说,如果不配置TaskScheduler,所有的定时任务都会在同一个线程中执行,可能会导致任务执行时间过长或者任务之间相互影响。 + 因此,为了更好地控制定时任务的执行,建议显式配置TaskScheduler,并根据具体需求设置线程池大小等属性。 + 这样可以确保定时任务的精确调度和高效执行。 + */ + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.setPoolSize(10); + taskScheduler.initialize(); + return taskScheduler; + } + /** + * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java b/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java new file mode 100644 index 0000000..131b79a --- /dev/null +++ b/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java @@ -0,0 +1,86 @@ +package cc.mrbird.febs.websocket; + +import cc.mrbird.febs.common.utils.AppContants; +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.asymmetric.KeyType; +import cn.hutool.crypto.asymmetric.RSA; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.HashMap; +import java.util.Map; + +/** + * + * @author XXX + * @date 2020-09-01 + **/ +@Slf4j +@Component +public class WsAuthInterceptor implements HandshakeInterceptor { + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception { +// log.info("拦截器,握手前"); + Map<String, String> params = parseParameterMap(request.getURI().getQuery()); + + String token = params.get("token"); + if (StrUtil.isNotBlank(token)) { + String inviteId = token; +// String inviteId = resolveToken(token); + log.info("----->{}", inviteId); + if (StrUtil.isBlank(inviteId) || AppContants.TIME_OUT.equals(inviteId)) { + return false; + } + + map.put("inviteId", inviteId); + return true; + } + return false; + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Exception e) { +// log.info("握手后"); + } + + private Map<String, String> parseParameterMap(String queryString) { + Map<String, String> parameterMap = new HashMap<>(); + String[] parameters = queryString.split("&"); + for (String parameter : parameters) { + String[] paramPair = parameter.split("="); + if (paramPair.length == 2) { + parameterMap.put(paramPair[0], paramPair[1]); + } + } + return parameterMap; + } + + private String resolveToken(String token) { + try { + RSA rsa = new RSA(AppContants.PRIVATE_KEY, null); + String[] tokens = StrUtil.split(rsa.decryptStr(token, KeyType.PrivateKey), "_"); +// log.info("websocket token : {}, timestemp : {}", tokens[0], tokens[1]); + if (verifyTokenExpired(Long.parseLong(tokens[1]))) { + return tokens[0]; + } else { + return AppContants.TIME_OUT; + } + } catch (Exception e) { + log.error("#解析token异常#", e); + return null; + } + } + + private Boolean verifyTokenExpired(Long time) { + boolean isDebug = false; + if (!isDebug) { + long currentTime = System.currentTimeMillis(); + return currentTime - time <= 10000; + } + return true; + } +} diff --git a/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java b/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java new file mode 100644 index 0000000..eeb9fae --- /dev/null +++ b/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java @@ -0,0 +1,84 @@ +package cc.mrbird.febs.websocket; + +import cn.hutool.core.util.StrUtil; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @date 2020-09-01 + **/ +public class WsSessionManager { + + private static final ConcurrentHashMap<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>(); + + public static void add(String key, WebSocketSession session) { + SESSIONS.put(key, session); + } + + + public static WebSocketSession remove(String key) { + return SESSIONS.remove(key); + } + + public static void removeAndClose(String key) { + WebSocketSession session = remove(key); + if (session != null) { + try { + // 关闭连接 + session.close(); + } catch (IOException e) { + // todo: 关闭出现异常处理 + e.printStackTrace(); + } + } + } + + public static WebSocketSession get(String key) { + // 获得 session + return SESSIONS.get(key); + } + + /** + * 发送消息 + * + * @param key 用户手机号 + * @param msg 消息 + */ + public static void sendMsgToOne(String key, String msg) { + TextMessage textMessage = new TextMessage(msg); + try { + if (SESSIONS.containsKey(key)) { + SESSIONS.get(key).sendMessage(textMessage); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * 批量发送 + * + * @param keys 手机号集合, 逗号隔开 + * @param msg 消息 + */ + public static void sendMsgToMany(String keys, String msg) { + TextMessage textMessage = new TextMessage(msg); + + List<String> keyList = StrUtil.splitTrim(keys, ","); + for (Map.Entry<String, WebSocketSession> entry : SESSIONS.entrySet()) { + if (keyList.contains(entry.getKey())) { + try { + entry.getValue().sendMessage(textMessage); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + +} -- Gitblit v1.9.1