From eaf453b84d916acb702b163ebcb462850daeb591 Mon Sep 17 00:00:00 2001
From: KKSU <15274802129@163.com>
Date: Wed, 19 Jun 2024 15:08:40 +0800
Subject: [PATCH] websocket推送消息

---
 src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java              |   54 ++++++++++
 src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java              |   56 +++++++++++
 src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java             |   84 ++++++++++++++++
 src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java |   14 ++
 src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java            |   86 +++++++++++++++++
 pom.xml                                                                  |    6 +
 6 files changed, 300 insertions(+), 0 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1002b06..bf9ce14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,6 +29,12 @@
 
     <dependencies>
 
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
 <!--        <dependency>-->
 <!--            <groupId>com.madgag</groupId>-->
 <!--            <artifactId>animated-gif-lib</artifactId>-->
diff --git a/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java b/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
index d6bc5c6..78b1f60 100644
--- a/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
+++ b/src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
@@ -7,6 +7,7 @@
 import cc.mrbird.febs.mall.vo.MallGoodsCommentVo;
 import cc.mrbird.febs.mall.vo.MallGoodsDetailsVo;
 import cc.mrbird.febs.mall.vo.MallGoodsListVo;
+import cc.mrbird.febs.websocket.WsSessionManager;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -57,4 +58,17 @@
         return new FebsResponse().success().data(mallGoodsService.findMallGoodsCommentByGoodsId(queryDto));
     }
 
+
+
+    /**
+     * TODO 测试
+     */
+
+    @ApiOperation(value = "模拟服务器给链接的websocket发送消息", notes = "模拟服务器给链接的websocket发送消息")
+    @GetMapping(value = "/token//{inviteId}/{text}")
+    public FebsResponse token(@PathVariable String inviteId, @PathVariable String text) {
+        WsSessionManager.sendMsgToOne(inviteId,text);
+        return new FebsResponse().success();
+    }
+
 }
diff --git a/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java b/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java
new file mode 100644
index 0000000..f4b6696
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java
@@ -0,0 +1,54 @@
+package cc.mrbird.febs.websocket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.PongMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author xxx
+ * @date 2020-09-01
+ **/
+@Slf4j
+@Component
+public class HttpAuthHandler extends TextWebSocketHandler {
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        Map<String, Object> attributes = session.getAttributes();
+        String inviteId = (String) attributes.get("inviteId");
+
+        WsSessionManager.add(inviteId, session);
+    }
+
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        log.info("收到消息--{}", message.getPayload());
+        session.sendMessage(message);
+    }
+
+    @Override
+    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
+        log.info("心跳");
+    }
+
+    @Override
+    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
+        log.info("错误 : {}", exception.toString());
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        log.info("连接中断 : {}", status.toString());
+        String phone = (String) session.getAttributes().get("inviteId");
+        WsSessionManager.remove(phone);
+    }
+}
diff --git a/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java b/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java
new file mode 100644
index 0000000..8d7f6ee
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java
@@ -0,0 +1,56 @@
+package cc.mrbird.febs.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+import javax.annotation.Resource;
+
+/**
+ *
+ * @author xxx
+ * @date 2020-09-01
+ **/
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig implements WebSocketConfigurer {
+
+    @Resource
+    private HttpAuthHandler httpAuthHandler;
+    @Resource
+    private WsAuthInterceptor wsAuthInterceptor;
+
+    @Override
+    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        registry.addHandler(httpAuthHandler, "wsxg")
+                .addInterceptors(wsAuthInterceptor)
+                .setAllowedOrigins("*");
+    }
+
+    @Bean
+    public TaskScheduler taskScheduler() {
+        /*
+        不显式配置TaskScheduler,Spring会使用默认的TaskScheduler实现类——ScheduledThreadPoolExecutor。
+        这个实现类使用一个线程池来执行任务,线程池的大小默认为1。
+        也就是说,如果不配置TaskScheduler,所有的定时任务都会在同一个线程中执行,可能会导致任务执行时间过长或者任务之间相互影响。
+        因此,为了更好地控制定时任务的执行,建议显式配置TaskScheduler,并根据具体需求设置线程池大小等属性。
+        这样可以确保定时任务的精确调度和高效执行。
+         */
+        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+        taskScheduler.setPoolSize(10);
+        taskScheduler.initialize();
+        return taskScheduler;
+    }
+    /**
+     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
+     */
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}
diff --git a/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java b/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java
new file mode 100644
index 0000000..131b79a
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java
@@ -0,0 +1,86 @@
+package cc.mrbird.febs.websocket;
+
+import cc.mrbird.febs.common.utils.AppContants;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.asymmetric.KeyType;
+import cn.hutool.crypto.asymmetric.RSA;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.HandshakeInterceptor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ * @author  XXX
+ * @date 2020-09-01
+ **/
+@Slf4j
+@Component
+public class WsAuthInterceptor implements HandshakeInterceptor {
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
+//        log.info("拦截器,握手前");
+        Map<String, String> params = parseParameterMap(request.getURI().getQuery());
+
+        String token = params.get("token");
+        if (StrUtil.isNotBlank(token)) {
+            String inviteId = token;
+//            String inviteId = resolveToken(token);
+            log.info("----->{}", inviteId);
+            if (StrUtil.isBlank(inviteId) || AppContants.TIME_OUT.equals(inviteId)) {
+                return false;
+            }
+
+            map.put("inviteId", inviteId);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Exception e) {
+//        log.info("握手后");
+    }
+
+    private Map<String, String> parseParameterMap(String queryString) {
+        Map<String, String> parameterMap = new HashMap<>();
+        String[] parameters = queryString.split("&");
+        for (String parameter : parameters) {
+            String[] paramPair = parameter.split("=");
+            if (paramPair.length == 2) {
+                parameterMap.put(paramPair[0], paramPair[1]);
+            }
+        }
+        return parameterMap;
+    }
+
+    private String resolveToken(String token) {
+        try {
+            RSA rsa = new RSA(AppContants.PRIVATE_KEY, null);
+            String[] tokens = StrUtil.split(rsa.decryptStr(token, KeyType.PrivateKey), "_");
+//            log.info("websocket token : {}, timestemp : {}", tokens[0], tokens[1]);
+            if (verifyTokenExpired(Long.parseLong(tokens[1]))) {
+                return tokens[0];
+            } else {
+                return AppContants.TIME_OUT;
+            }
+        } catch (Exception e) {
+            log.error("#解析token异常#", e);
+            return null;
+        }
+    }
+
+    private Boolean verifyTokenExpired(Long time) {
+        boolean isDebug = false;
+        if (!isDebug) {
+            long currentTime = System.currentTimeMillis();
+            return currentTime - time <= 10000;
+        }
+        return true;
+    }
+}
diff --git a/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java b/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java
new file mode 100644
index 0000000..eeb9fae
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java
@@ -0,0 +1,84 @@
+package cc.mrbird.febs.websocket;
+
+import cn.hutool.core.util.StrUtil;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @date 2020-09-01
+ **/
+public class WsSessionManager {
+
+    private static final ConcurrentHashMap<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>();
+
+    public static void add(String key, WebSocketSession session) {
+        SESSIONS.put(key, session);
+    }
+
+
+    public static WebSocketSession remove(String key) {
+        return SESSIONS.remove(key);
+    }
+
+    public static void removeAndClose(String key) {
+        WebSocketSession session = remove(key);
+        if (session != null) {
+            try {
+                // 关闭连接
+                session.close();
+            } catch (IOException e) {
+                // todo: 关闭出现异常处理
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public static WebSocketSession get(String key) {
+        // 获得 session
+        return SESSIONS.get(key);
+    }
+
+    /**
+     * 发送消息
+     *
+     * @param key 用户手机号
+     * @param msg 消息
+     */
+    public static void sendMsgToOne(String key, String msg) {
+        TextMessage textMessage = new TextMessage(msg);
+        try {
+            if (SESSIONS.containsKey(key)) {
+                SESSIONS.get(key).sendMessage(textMessage);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 批量发送
+     *
+     * @param keys 手机号集合, 逗号隔开
+     * @param msg 消息
+     */
+    public static void sendMsgToMany(String keys, String msg) {
+        TextMessage textMessage = new TextMessage(msg);
+
+        List<String> keyList = StrUtil.splitTrim(keys, ",");
+        for (Map.Entry<String, WebSocketSession> entry : SESSIONS.entrySet()) {
+            if (keyList.contains(entry.getKey())) {
+                try {
+                    entry.getValue().sendMessage(textMessage);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+}

--
Gitblit v1.9.1