KKSU
2024-06-19 eaf453b84d916acb702b163ebcb462850daeb591
websocket推送消息
2 files modified
4 files added
300 ■■■■■ changed files
pom.xml 6 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java 14 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java 54 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java 56 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java 86 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java 84 ●●●●● patch | view | raw | blame | history
pom.xml
@@ -29,6 +29,12 @@
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>com.madgag</groupId>-->
<!--            <artifactId>animated-gif-lib</artifactId>-->
src/main/java/cc/mrbird/febs/mall/controller/ApiMallGoodsController.java
@@ -7,6 +7,7 @@
import cc.mrbird.febs.mall.vo.MallGoodsCommentVo;
import cc.mrbird.febs.mall.vo.MallGoodsDetailsVo;
import cc.mrbird.febs.mall.vo.MallGoodsListVo;
import cc.mrbird.febs.websocket.WsSessionManager;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -57,4 +58,17 @@
        return new FebsResponse().success().data(mallGoodsService.findMallGoodsCommentByGoodsId(queryDto));
    }
    /**
     * TODO 测试
     */
    @ApiOperation(value = "模拟服务器给链接的websocket发送消息", notes = "模拟服务器给链接的websocket发送消息")
    @GetMapping(value = "/token//{inviteId}/{text}")
    public FebsResponse token(@PathVariable String inviteId, @PathVariable String text) {
        WsSessionManager.sendMsgToOne(inviteId,text);
        return new FebsResponse().success();
    }
}
src/main/java/cc/mrbird/febs/websocket/HttpAuthHandler.java
New file
@@ -0,0 +1,54 @@
package cc.mrbird.febs.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author xxx
 * @date 2020-09-01
 **/
@Slf4j
@Component
public class HttpAuthHandler extends TextWebSocketHandler {
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Map<String, Object> attributes = session.getAttributes();
        String inviteId = (String) attributes.get("inviteId");
        WsSessionManager.add(inviteId, session);
    }
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("收到消息--{}", message.getPayload());
        session.sendMessage(message);
    }
    @Override
    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
        log.info("心跳");
    }
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        log.info("错误 : {}", exception.toString());
    }
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        log.info("连接中断 : {}", status.toString());
        String phone = (String) session.getAttributes().get("inviteId");
        WsSessionManager.remove(phone);
    }
}
src/main/java/cc/mrbird/febs/websocket/WebSocketConfig.java
New file
@@ -0,0 +1,56 @@
package cc.mrbird.febs.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.annotation.Resource;
/**
 *
 * @author xxx
 * @date 2020-09-01
 **/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Resource
    private HttpAuthHandler httpAuthHandler;
    @Resource
    private WsAuthInterceptor wsAuthInterceptor;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(httpAuthHandler, "wsxg")
                .addInterceptors(wsAuthInterceptor)
                .setAllowedOrigins("*");
    }
    @Bean
    public TaskScheduler taskScheduler() {
        /*
        不显式配置TaskScheduler,Spring会使用默认的TaskScheduler实现类——ScheduledThreadPoolExecutor。
        这个实现类使用一个线程池来执行任务,线程池的大小默认为1。
        也就是说,如果不配置TaskScheduler,所有的定时任务都会在同一个线程中执行,可能会导致任务执行时间过长或者任务之间相互影响。
        因此,为了更好地控制定时任务的执行,建议显式配置TaskScheduler,并根据具体需求设置线程池大小等属性。
        这样可以确保定时任务的精确调度和高效执行。
         */
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        return taskScheduler;
    }
    /**
     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
src/main/java/cc/mrbird/febs/websocket/WsAuthInterceptor.java
New file
@@ -0,0 +1,86 @@
package cc.mrbird.febs.websocket;
import cc.mrbird.febs.common.utils.AppContants;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.asymmetric.KeyType;
import cn.hutool.crypto.asymmetric.RSA;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.HashMap;
import java.util.Map;
/**
 *
 * @author  XXX
 * @date 2020-09-01
 **/
@Slf4j
@Component
public class WsAuthInterceptor implements HandshakeInterceptor {
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
//        log.info("拦截器,握手前");
        Map<String, String> params = parseParameterMap(request.getURI().getQuery());
        String token = params.get("token");
        if (StrUtil.isNotBlank(token)) {
            String inviteId = token;
//            String inviteId = resolveToken(token);
            log.info("----->{}", inviteId);
            if (StrUtil.isBlank(inviteId) || AppContants.TIME_OUT.equals(inviteId)) {
                return false;
            }
            map.put("inviteId", inviteId);
            return true;
        }
        return false;
    }
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler webSocketHandler, Exception e) {
//        log.info("握手后");
    }
    private Map<String, String> parseParameterMap(String queryString) {
        Map<String, String> parameterMap = new HashMap<>();
        String[] parameters = queryString.split("&");
        for (String parameter : parameters) {
            String[] paramPair = parameter.split("=");
            if (paramPair.length == 2) {
                parameterMap.put(paramPair[0], paramPair[1]);
            }
        }
        return parameterMap;
    }
    private String resolveToken(String token) {
        try {
            RSA rsa = new RSA(AppContants.PRIVATE_KEY, null);
            String[] tokens = StrUtil.split(rsa.decryptStr(token, KeyType.PrivateKey), "_");
//            log.info("websocket token : {}, timestemp : {}", tokens[0], tokens[1]);
            if (verifyTokenExpired(Long.parseLong(tokens[1]))) {
                return tokens[0];
            } else {
                return AppContants.TIME_OUT;
            }
        } catch (Exception e) {
            log.error("#解析token异常#", e);
            return null;
        }
    }
    private Boolean verifyTokenExpired(Long time) {
        boolean isDebug = false;
        if (!isDebug) {
            long currentTime = System.currentTimeMillis();
            return currentTime - time <= 10000;
        }
        return true;
    }
}
src/main/java/cc/mrbird/febs/websocket/WsSessionManager.java
New file
@@ -0,0 +1,84 @@
package cc.mrbird.febs.websocket;
import cn.hutool.core.util.StrUtil;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @date 2020-09-01
 **/
public class WsSessionManager {
    private static final ConcurrentHashMap<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>();
    public static void add(String key, WebSocketSession session) {
        SESSIONS.put(key, session);
    }
    public static WebSocketSession remove(String key) {
        return SESSIONS.remove(key);
    }
    public static void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                // 关闭连接
                session.close();
            } catch (IOException e) {
                // todo: 关闭出现异常处理
                e.printStackTrace();
            }
        }
    }
    public static WebSocketSession get(String key) {
        // 获得 session
        return SESSIONS.get(key);
    }
    /**
     * 发送消息
     *
     * @param key 用户手机号
     * @param msg 消息
     */
    public static void sendMsgToOne(String key, String msg) {
        TextMessage textMessage = new TextMessage(msg);
        try {
            if (SESSIONS.containsKey(key)) {
                SESSIONS.get(key).sendMessage(textMessage);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 批量发送
     *
     * @param keys 手机号集合, 逗号隔开
     * @param msg 消息
     */
    public static void sendMsgToMany(String keys, String msg) {
        TextMessage textMessage = new TextMessage(msg);
        List<String> keyList = StrUtil.splitTrim(keys, ",");
        for (Map.Entry<String, WebSocketSession> entry : SESSIONS.entrySet()) {
            if (keyList.contains(entry.getKey())) {
                try {
                    entry.getValue().sendMessage(textMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}