jyy
2021-03-11 3e5dc8c645b73e73fbeb5564c60a979d0557f5b4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.matrix.component.websoket;
 
 
import com.matrix.core.tools.LogUtil;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
 
/**
 *
 * @ClassName: WebSocketPushHandler
 * @Description: 创建处理器
 * @author jyy
 * @date 2017年9月26日 上午10:36:17
 */
public class WebSocketPushHandler extends TextWebSocketHandler {
 
 
    private static final List<WebSocketSession> userList = new ArrayList<>();
 
    private static final List<WebSoketMessageObserver> observerList=new ArrayList<>();
 
 
    public  WebSocketPushHandler(){
        LogUtil.info("WebSocketPushHandler初始化");
        //注册观察者
        addObserver(new WebSoketMessageRabbitObserver());
    }
    /**
     * 用户进入系统监听
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
 
        LogUtil.info("用户进入系统:" + session.getAttributes());
        Map<String, Object> map = session.getAttributes();
        for (String key : map.keySet()) {
            LogUtil.info("key:" + key + " and value:" + map.get(key));
        }
        observerList.forEach(item->item.userConnection(session));
        userList.add(session);
    }
 
    /**
     * 处理用户请求
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        LogUtil.info(session.getId()+"用户发送请求信息。。。");
        observerList.forEach(item->item.handleTextMessage(session,message));
    }
 
    /**
     * 用户退出后的处理
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        if (session.isOpen()) {
            session.close();
        }
        userList.remove(session);
        observerList.forEach(item->item.afterConnectionClosed(session,status));
        LogUtil.info(session.getId()+"用户退出系统");
    }
 
    /**
     * 自定义函数
     * 给所有的在线用户发送消息
     */
    public static void sendMessagesToUsers(TextMessage message) {
        for (WebSocketSession user : userList) {
            try {
                if (user.isOpen()) {
                    user.sendMessage(message);
                }
            } catch (IOException e) {
                LogUtil.error("soket发送异常",e);
            }
        }
    }
 
    /**
     * 自定义函数
     * 发送消息给指定的在线用户
     */
    public static void sendMessageToUser(String userId, TextMessage message) {
        for (WebSocketSession user : userList) {
            if (user.getAttributes().get("userId").equals(userId)) {
                try {
                    if (user.isOpen()) {
                        user.sendMessage(message);
                    }
                } catch (IOException e) {
                    LogUtil.error("soket发送异常",e);
                }
                break;
 
            }
        }
    }
 
 
    /**
     * 添加消息观察者
     * @param obj
     */
    public static void addObserver(WebSoketMessageObserver obj) {
        LogUtil.info("添加消息观察者");
        observerList.add(obj);
    }
 
 
 
}