SSM 框架中 websocket 实现前后端即时通信

web 前端

参考

https://www.cnblogs.com/1wen/p/5808276.html

https://github.com/zimv/websocket-heartbeat-js

js 封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
function WebsocketHeartbeatJs({
url,
pingTimeout = 15000, // 心跳频率
pongTimeout = 10000, // 10s 以内没有收到回复,就关闭连接
reconnectTimeout = 2000,
pingMsg = 'heartbeat',
repeatLimit = 10
}) {
this.opts = {
url: url,
pingTimeout: pingTimeout,
pongTimeout: pongTimeout,
reconnectTimeout: reconnectTimeout,
pingMsg: pingMsg,
repeatLimit: repeatLimit
};
this.ws = null; //websocket 实例
this.repeat = 0;

//override hook function
this.onclose = () => {
console.log("websocket close");
};
this.onerror = () => {
console.log("websocket error");
};
this.onopen = () => {};
this.onmessage = () => {};
this.onreconnect = () => {};

this.createWebSocket();
}
WebsocketHeartbeatJs.prototype.createWebSocket = function() {
try {
this.ws = new WebSocket(this.opts.url);
this.initEventHandle();
} catch (e) {
this.reconnect();
throw e;
}
};

WebsocketHeartbeatJs.prototype.initEventHandle = function() {
this.ws.onclose = () => {
this.onclose();
this.reconnect();
};
this.ws.onerror = () => {
this.onerror();
this.reconnect();
};
this.ws.onopen = () => {
this.repeat = 0;
this.onopen();
// 心跳检测重置
this.heartCheck();
};
this.ws.onmessage = (event) => {
this.onmessage(event);
// 如果获取到消息,心跳检测重置
// 拿到任何消息都说明当前连接是正常的
this.heartCheck();
};
};

WebsocketHeartbeatJs.prototype.reconnect = function() {
if (this.opts.repeatLimit > 0 && this.opts.repeatLimit <= this.repeat) return; //limit repeat the number
if (this.lockReconnect || this.forbidReconnect) return;
this.lockReconnect = true;
this.repeat++; // 必须在 lockReconnect 之后,避免进行无效计数
this.onreconnect();
// 没连接上会一直重连,设置延迟避免请求过多
setTimeout(() => {
this.createWebSocket();
this.lockReconnect = false;
}, this.opts.reconnectTimeout);
};
WebsocketHeartbeatJs.prototype.send = function(msg) {
this.ws.send(msg);
};
// 心跳检测
WebsocketHeartbeatJs.prototype.heartCheck = function() {
this.heartReset();
this.heartStart();
};
WebsocketHeartbeatJs.prototype.heartStart = function() {
if (this.forbidReconnect) return; // 不再重连就不再执行心跳
this.pingTimeoutId = setTimeout(() => {
// 这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage 拿到返回的心跳就说明连接正常
this.ws.send(this.opts.pingMsg);
// 如果超过一定时间还没重置,说明后端主动断开了
this.pongTimeoutId = setTimeout(() => {
// 如果 onclose 会执行 reconnect,我们执行 ws.close () 就行了。如果直接执行 reconnect 会触发 onclose 导致重连两次
this.ws.close();
}, this.opts.pongTimeout);
}, this.opts.pingTimeout);
};
WebsocketHeartbeatJs.prototype.heartReset = function() {
clearTimeout(this.pingTimeoutId);
clearTimeout(this.pongTimeoutId);
};
WebsocketHeartbeatJs.prototype.close = function() {
// 如果手动关闭连接,不再重连
this.forbidReconnect = true;
this.heartReset();
this.ws.close();
};
if (typeof window != 'undefined') window.WebsocketHeartbeatJs = WebsocketHeartbeatJs;

main.jsp

主页面嵌入 websocket 页面

1
<jsp:include page="./websocket.jsp"></jsp:include>

websocket.jsp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<script src="${pageContext.request.contextPath}/views/js/websocket.js"></script>
<script>
$(function (){
let userid = '${sqpUserId}';
let websocketHeartbeatJs = new WebsocketHeartbeatJs({
url: "ws://"+ location.host + "${pageContext.request.contextPath}/websocket/loginstatus.do?userid="+userid
});
websocketHeartbeatJs.onopen = function () {
console.log("连接 websocket 服务器.");
// websocketHeartbeatJs.send('hello server');
}
websocketHeartbeatJs.onmessage = function (evt) {
let data = evt.data;
console.log(data);
if(data) {
let msg = data.split("#")[0];
if(msg === '2') {
//视频通话请求
let confId = data.split("#")[1];

}
}
}
websocketHeartbeatJs.onreconnect = function () {
console.log('重连中...');
}
});
</script>

后端

pom.xml

1
2
3
4
5
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>${spring.version}</version>
</dependency>

spring.xml

扫码以下几个websocket 相关的类

1
<context:component-scan base-package="com.bjtcrj.scm.system.handler" />

WebsocketConst

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.bjtcrj.gms.system.websocket;

/**
* @Description: Websocket常量类
* @author: taoyan
* @date: 2020年03月23日
*/
public class WebsocketConst {
//回复消息类型
public static final String CONNECTION_SUCCESS = "0"; // 连接成功
public static final String PONG = "1"; // 心跳回复
public static final String CALL = "2"; // 视频通话请求
public static final String INVALID = "3"; // 无法处理
}

WebSocketConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.bjtcrj.gms.system.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//1.注册WebSocket
String websocket_url = "/websocket/loginstatus.do"; //设置websocket的地址
registry.addHandler(myHandler(), websocket_url). //注册Handler
addInterceptors(new HandshakeInterceptor()).setAllowedOrigins("*"); //注册Interceptor

//2.注册SockJS,提供SockJS支持(主要是兼容ie8)
String sockjs_url = "/sockjs/loginstatus.do"; //设置sockjs的地址
registry.addHandler(myHandler(), sockjs_url). //注册Handler
addInterceptors(new HandshakeInterceptor()). //注册Interceptor
withSockJS(); //支持sockjs协议
}

@Bean
public WebsocketHandler myHandler(){
return new WebsocketHandler();
}
}

HandshakeInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.bjtcrj.gms.system.websocket;

import com.bjtcrj.gms.common.utils.Constants;
import com.bjtcrj.gms.common.utils.StringEx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.util.Map;

@Slf4j
public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {

@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
String userid = servletRequest.getParameter("userid");
if(!StringEx.isNull(userid)){
//使用userName区分WebSocketHandler,以便定向发送消息
log.debug("beforeHandshake-userid:"+userid);

attributes.put("WEBSOCKET_USERID", userid);
}

ServletServerHttpRequest servletRequest1 = (ServletServerHttpRequest) request;
HttpSession session = servletRequest1.getServletRequest().getSession(false);
if (session != null) {
//使用userName区分WebSocketHandler,以便定向发送消息
String userid1 = (String) session.getAttribute(Constants.SESSION_USER_ID);
attributes.put("WEBSOCKET_USERID", userid1);
}
}
return true;
}

@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
System.out.println("After Handshake");
super.afterHandshake(serverHttpRequest, serverHttpResponse, webSocketHandler, e);
}
}

WebsocketHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.bjtcrj.gms.system.websocket;

import com.bjtcrj.gms.common.utils.StringEx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;

@Slf4j
public class WebsocketHandler extends TextWebSocketHandler {

private WebSocketSession session;

//1.增加app端标识
private static String APP_SESSION_SUFFIX = "_app";
private static String WEB_SESSION_SUFFIX = "_web";

private static CopyOnWriteArraySet<WebsocketHandler> webSockets = new CopyOnWriteArraySet<>();

// userid 与 用户socketsession 对应信息
private static Map<String, WebSocketSession> sessionPool = new HashMap<String, WebSocketSession>();

// 连接 就绪时
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String userid = (String) session.getAttributes().get("WEBSOCKET_USERID");
log.debug("userid:" + userid);

if (!StringEx.isNull(userid)) {
try {
this.session = session;
webSockets.add(this);
sessionPool.put(userid, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());

session.sendMessage(new TextMessage(WebsocketConst.CONNECTION_SUCCESS + "#连接成功"));
} catch (Exception e) {
}
}
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
if (WebsocketConst.CMD_CHECK.equals(message.getPayload())) {
//心跳消息,回复
log.debug("发送 pong 数据:" + message.toString());
session.sendMessage(new TextMessage(WebsocketConst.PONG + "#服务器端心跳回复"));
} else {
log.debug("来自客户端的数据:" + message.toString());
session.sendMessage(new TextMessage(WebsocketConst.INVALID + "#暂不支持该功能"));
}
}

// 处理传输时异常
@Override
public void handleTransportError(WebSocketSession session,
Throwable exception) throws Exception {
log.error("handleTransportError:" + exception.getMessage());
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
log.debug("connect websocket closed.......");

webSockets.remove(this);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());

String userid = (String) session.getAttributes().get("WEBSOCKET_USERID");
if (!StringEx.isNull(userid)) {
sessionPool.remove(userid);
}
}

// 给所有用户发送信息
public static void sendMsgToAllUsers(WebSocketMessage<?> message) throws Exception {
log.info("【websocket消息】广播消息:" + message);
try {
for (WebsocketHandler webSocket : webSockets) {
if (webSocket.session != null && webSocket.session.isOpen()) {
webSocket.session.sendMessage(message);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

//在线用户 ID
public synchronized static List<String> getAllUsersOnline() {
List<String> l = new ArrayList<String>();
for (String s : sessionPool.keySet()) {
WebSocketSession session = sessionPool.get(s);
if (session != null && session.isOpen()) {
l.add(s);
}
}
return l;
}

// 判断用户是否在线
public static boolean isOnline(String userid) {
WebSocketSession session = sessionPool.get(userid);
if (session != null && session.isOpen()) {
return true;
} else {
return false;
}
}

public static void callUser(String userid, String confId) {
sendMessage(userid, WebsocketConst.CALL+"#"+confId);
}

// 给指定用户发消息
public static void sendMessage(String userid, String msg) {
WebSocketSession session = sessionPool.get(userid);
if (session != null && session.isOpen()) {
try {
session.sendMessage(new TextMessage(msg, true));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

测试

获取所有在线人员 userid

1
2
3
4
5
@RequestMapping(value = "/getAllUsers")
@ResponseBody
public List<String> getAllUsers(HttpServletRequest request, HttpServletResponse response) throws Exception{
return WebsocketHandler.getAllUsersOnline();
}

浏览器地址栏输入

http://localhost:8080/scm-web/login/getAllUsers.do