From eaf453b84d916acb702b163ebcb462850daeb591 Mon Sep 17 00:00:00 2001
From: KKSU <15274802129@163.com>
Date: Wed, 19 Jun 2024 15:08:40 +0800
Subject: [PATCH] websocket推送消息
---
src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java | 54 ++++++++++
src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java | 56 +++++++++++
src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java | 84 ++++++++++++++++
src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java | 14 ++
src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java | 86 +++++++++++++++++
pom.xml | 6 +
6 files changed, 300 insertions(+), 0 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1002b06..bf9ce14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,6 +29,12 @@
<dependencies>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-websocket</artifactId>
+ </dependency>
+
<!-- <dependency>-->
<!-- <groupId>com.madgag</groupId>-->
<!-- <artifactId>animated-gif-lib</artifactId>-->
diff --git a/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java b/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
index d6bc5c6..78b1f60 100644
--- a/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
+++ b/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
@@ -7,6 +7,7 @@
import cc.mrbird.febs.mall.vo.MallGoodsCommentVo;
import cc.mrbird.febs.mall.vo.MallGoodsDetailsVo;
import cc.mrbird.febs.mall.vo.MallGoodsListVo;
+import cc.mrbird.febs.websocket.WsSessionManager;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -57,4 +58,17 @@
return new FebsResponse().success().data(mallGoodsService.findMallGoodsCommentByGoodsId(queryDto));
}
+
+
+ /**
+ * TODO 测试
+ */
+
+ @ApiOperation(value = "模拟服务器给链接的websocket发送消息", notes = "模拟服务器给链接的websocket发送消息")
+ @GetMapping(value = "/token//{inviteId}/{text}")
+ public FebsResponse token(@PathVariable String inviteId, @PathVariable String text) {
+ WsSessionManager.sendMsgToOne(inviteId,text);
+ return new FebsResponse().success();
+ }
+
}
diff --git a/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java b/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java
new file mode 100644
index 0000000..f4b6696
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java
@@ -0,0 +1,54 @@
+package cc.mrbird.febs.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.PongMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author xxx
+ * @date 2020-09-01
+ **/
+@Slf4j
+@Component
+public class HttpAuthHandler extends TextWebSocketHandler {
+
+ @Override
+ public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+ Map<String, Object> attributes = session.getAttributes();
+ String inviteId = (String) attributes.get("inviteId");
+
+ WsSessionManager.add(inviteId, session);
+ }
+
+ @Override
+ protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+ log.info("收到消息--{}", message.getPayload());
+ session.sendMessage(message);
+ }
+
+ @Override
+ protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
+ log.info("心跳");
+ }
+
+ @Override
+ public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
+ log.info("错误 : {}", exception.toString());
+ }
+
+ @Override
+ public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+ log.info("连接中断 : {}", status.toString());
+ String phone = (String) session.getAttributes().get("inviteId");
+ WsSessionManager.remove(phone);
+ }
+}
diff --git a/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java b/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java
new file mode 100644
index 0000000..8d7f6ee
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java
@@ -0,0 +1,56 @@
+package cc.mrbird.febs.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+import javax.annotation.Resource;
+
+/**
+ *
+ * @author xxx
+ * @date 2020-09-01
+ **/
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig implements WebSocketConfigurer {
+
+ @Resource
+ private HttpAuthHandler httpAuthHandler;
+ @Resource
+ private WsAuthInterceptor wsAuthInterceptor;
+
+ @Override
+ public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+ registry.addHandler(httpAuthHandler, "wsxg")
+ .addInterceptors(wsAuthInterceptor)
+ .setAllowedOrigins("*");
+ }
+
+ @Bean
+ public TaskScheduler taskScheduler() {
+ /*
+ 不显式配置TaskScheduler,Spring会使用默认的TaskScheduler实现类——ScheduledThreadPoolExecutor。
+ 这个实现类使用一个线程池来执行任务,线程池的大小默认为1。
+ 也就是说,如果不配置TaskScheduler,所有的定时任务都会在同一个线程中执行,可能会导致任务执行时间过长或者任务之间相互影响。
+ 因此,为了更好地控制定时任务的执行,建议显式配置TaskScheduler,并根据具体需求设置线程池大小等属性。
+ 这样可以确保定时任务的精确调度和高效执行。
+ */
+ ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+ taskScheduler.setPoolSize(10);
+ taskScheduler.initialize();
+ return taskScheduler;
+ }
+ /**
+ * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
+ */
+ @Bean
+ public ServerEndpointExporter serverEndpointExporter() {
+ return new ServerEndpointExporter();
+ }
+}
diff --git a/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java b/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java
new file mode 100644
index 0000000..131b79a
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java
@@ -0,0 +1,86 @@
+package cc.mrbird.febs.websocket;
+
+import cc.mrbird.febs.common.utils.AppContants;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.asymmetric.KeyType;
+import cn.hutool.crypto.asymmetric.RSA;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * @author XXX
+ * @date 2020-09-01
+ **/
+@Slf4j
+@Component
+public class WsAuthInterceptor implements HandshakeInterceptor {
+ @Override
+ public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
+// log.info("拦截器,握手前");
+ Map<String, String> params = parseParameterMap(request.getURI().getQuery());
+
+ String token = params.get("token");
+ if (StrUtil.isNotBlank(token)) {
+ String inviteId = token;
+// String inviteId = resolveToken(token);
+ log.info("----->{}", inviteId);
+ if (StrUtil.isBlank(inviteId) || AppContants.TIME_OUT.equals(inviteId)) {
+ return false;
+ }
+
+ map.put("inviteId", inviteId);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Exception e) {
+// log.info("握手后");
+ }
+
+ private Map<String, String> parseParameterMap(String queryString) {
+ Map<String, String> parameterMap = new HashMap<>();
+ String[] parameters = queryString.split("&");
+ for (String parameter : parameters) {
+ String[] paramPair = parameter.split("=");
+ if (paramPair.length == 2) {
+ parameterMap.put(paramPair[0], paramPair[1]);
+ }
+ }
+ return parameterMap;
+ }
+
+ private String resolveToken(String token) {
+ try {
+ RSA rsa = new RSA(AppContants.PRIVATE_KEY, null);
+ String[] tokens = StrUtil.split(rsa.decryptStr(token, KeyType.PrivateKey), "_");
+// log.info("websocket token : {}, timestemp : {}", tokens[0], tokens[1]);
+ if (verifyTokenExpired(Long.parseLong(tokens[1]))) {
+ return tokens[0];
+ } else {
+ return AppContants.TIME_OUT;
+ }
+ } catch (Exception e) {
+ log.error("#解析token异常#", e);
+ return null;
+ }
+ }
+
+ private Boolean verifyTokenExpired(Long time) {
+ boolean isDebug = false;
+ if (!isDebug) {
+ long currentTime = System.currentTimeMillis();
+ return currentTime - time <= 10000;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java b/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java
new file mode 100644
index 0000000..eeb9fae
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java
@@ -0,0 +1,84 @@
+package cc.mrbird.febs.websocket;
+
+import cn.hutool.core.util.StrUtil;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @date 2020-09-01
+ **/
+public class WsSessionManager {
+
+ private static final ConcurrentHashMap<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>();
+
+ public static void add(String key, WebSocketSession session) {
+ SESSIONS.put(key, session);
+ }
+
+
+ public static WebSocketSession remove(String key) {
+ return SESSIONS.remove(key);
+ }
+
+ public static void removeAndClose(String key) {
+ WebSocketSession session = remove(key);
+ if (session != null) {
+ try {
+ // 关闭连接
+ session.close();
+ } catch (IOException e) {
+ // todo: 关闭出现异常处理
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static WebSocketSession get(String key) {
+ // 获得 session
+ return SESSIONS.get(key);
+ }
+
+ /**
+ * 发送消息
+ *
+ * @param key 用户手机号
+ * @param msg 消息
+ */
+ public static void sendMsgToOne(String key, String msg) {
+ TextMessage textMessage = new TextMessage(msg);
+ try {
+ if (SESSIONS.containsKey(key)) {
+ SESSIONS.get(key).sendMessage(textMessage);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 批量发送
+ *
+ * @param keys 手机号集合, 逗号隔开
+ * @param msg 消息
+ */
+ public static void sendMsgToMany(String keys, String msg) {
+ TextMessage textMessage = new TextMessage(msg);
+
+ List<String> keyList = StrUtil.splitTrim(keys, ",");
+ for (Map.Entry<String, WebSocketSession> entry : SESSIONS.entrySet()) {
+ if (keyList.contains(entry.getKey())) {
+ try {
+ entry.getValue().sendMessage(textMessage);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+}
--
Gitblit v1.9.1