SSM 框架中 websocket 实现前后端即时通信
web 前端
参考
https://www.cnblogs.com/1wen/p/5808276.html
https://github.com/zimv/websocket-heartbeat-js
js 封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| function WebsocketHeartbeatJs({ url, pingTimeout = 15000, // 心跳频率 pongTimeout = 10000, // 10s 以内没有收到回复,就关闭连接 reconnectTimeout = 2000, pingMsg = 'heartbeat', repeatLimit = 10 }) { this.opts = { url: url, pingTimeout: pingTimeout, pongTimeout: pongTimeout, reconnectTimeout: reconnectTimeout, pingMsg: pingMsg, repeatLimit: repeatLimit }; this.ws = null; this.repeat = 0;
this.onclose = () => { console.log("websocket close"); }; this.onerror = () => { console.log("websocket error"); }; this.onopen = () => {}; this.onmessage = () => {}; this.onreconnect = () => {};
this.createWebSocket(); } WebsocketHeartbeatJs.prototype.createWebSocket = function() { try { this.ws = new WebSocket(this.opts.url); this.initEventHandle(); } catch (e) { this.reconnect(); throw e; } };
WebsocketHeartbeatJs.prototype.initEventHandle = function() { this.ws.onclose = () => { this.onclose(); this.reconnect(); }; this.ws.onerror = () => { this.onerror(); this.reconnect(); }; this.ws.onopen = () => { this.repeat = 0; this.onopen(); this.heartCheck(); }; this.ws.onmessage = (event) => { this.onmessage(event); this.heartCheck(); }; };
WebsocketHeartbeatJs.prototype.reconnect = function() { if (this.opts.repeatLimit > 0 && this.opts.repeatLimit <= this.repeat) return; if (this.lockReconnect || this.forbidReconnect) return; this.lockReconnect = true; this.repeat++; this.onreconnect(); setTimeout(() => { this.createWebSocket(); this.lockReconnect = false; }, this.opts.reconnectTimeout); }; WebsocketHeartbeatJs.prototype.send = function(msg) { this.ws.send(msg); };
WebsocketHeartbeatJs.prototype.heartCheck = function() { this.heartReset(); this.heartStart(); }; WebsocketHeartbeatJs.prototype.heartStart = function() { if (this.forbidReconnect) return; this.pingTimeoutId = setTimeout(() => { this.ws.send(this.opts.pingMsg); this.pongTimeoutId = setTimeout(() => { this.ws.close(); }, this.opts.pongTimeout); }, this.opts.pingTimeout); }; WebsocketHeartbeatJs.prototype.heartReset = function() { clearTimeout(this.pingTimeoutId); clearTimeout(this.pongTimeoutId); }; WebsocketHeartbeatJs.prototype.close = function() { this.forbidReconnect = true; this.heartReset(); this.ws.close(); }; if (typeof window != 'undefined') window.WebsocketHeartbeatJs = WebsocketHeartbeatJs;
|
main.jsp
主页面嵌入 websocket
页面
1
| <jsp:include page="./websocket.jsp"></jsp:include>
|
websocket.jsp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <script src="${pageContext.request.contextPath}/views/js/websocket.js"></script> <script> $(function (){ let userid = '${sqpUserId}'; let websocketHeartbeatJs = new WebsocketHeartbeatJs({ url: "ws://"+ location.host + "${pageContext.request.contextPath}/websocket/loginstatus.do?userid="+userid }); websocketHeartbeatJs.onopen = function () { console.log("连接 websocket 服务器."); } websocketHeartbeatJs.onmessage = function (evt) { let data = evt.data; console.log(data); if(data) { let msg = data.split("#")[0]; if(msg === '2') { let confId = data.split("#")[1];
} } } websocketHeartbeatJs.onreconnect = function () { console.log('重连中...'); } }); </script>
|
后端
pom.xml
1 2 3 4 5
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>${spring.version}</version> </dependency>
|
spring.xml
扫码以下几个websocket 相关的类
1
| <context:component-scan base-package="com.bjtcrj.scm.system.handler" />
|
WebsocketConst
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.bjtcrj.gms.system.websocket;
public class WebsocketConst { public static final String CONNECTION_SUCCESS = "0"; public static final String PONG = "1"; public static final String CALL = "2"; public static final String INVALID = "3"; }
|
WebSocketConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.bjtcrj.gms.system.websocket;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration @EnableWebSocket public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
@Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { String websocket_url = "/websocket/loginstatus.do"; registry.addHandler(myHandler(), websocket_url). addInterceptors(new HandshakeInterceptor()).setAllowedOrigins("*");
String sockjs_url = "/sockjs/loginstatus.do"; registry.addHandler(myHandler(), sockjs_url). addInterceptors(new HandshakeInterceptor()). withSockJS(); }
@Bean public WebsocketHandler myHandler(){ return new WebsocketHandler(); } }
|
HandshakeInterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.bjtcrj.gms.system.websocket;
import com.bjtcrj.gms.common.utils.Constants; import com.bjtcrj.gms.common.utils.StringEx; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import java.util.Map;
@Slf4j public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {
@Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception { if (request instanceof ServletServerHttpRequest) { HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); String userid = servletRequest.getParameter("userid"); if(!StringEx.isNull(userid)){ log.debug("beforeHandshake-userid:"+userid);
attributes.put("WEBSOCKET_USERID", userid); }
ServletServerHttpRequest servletRequest1 = (ServletServerHttpRequest) request; HttpSession session = servletRequest1.getServletRequest().getSession(false); if (session != null) { String userid1 = (String) session.getAttribute(Constants.SESSION_USER_ID); attributes.put("WEBSOCKET_USERID", userid1); } } return true; }
@Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) { System.out.println("After Handshake"); super.afterHandshake(serverHttpRequest, serverHttpResponse, webSocketHandler, e); } }
|
WebsocketHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| package com.bjtcrj.gms.system.websocket;
import com.bjtcrj.gms.common.utils.StringEx; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.*; import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet;
@Slf4j public class WebsocketHandler extends TextWebSocketHandler {
private WebSocketSession session;
private static String APP_SESSION_SUFFIX = "_app"; private static String WEB_SESSION_SUFFIX = "_web";
private static CopyOnWriteArraySet<WebsocketHandler> webSockets = new CopyOnWriteArraySet<>();
private static Map<String, WebSocketSession> sessionPool = new HashMap<String, WebSocketSession>();
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String userid = (String) session.getAttributes().get("WEBSOCKET_USERID"); log.debug("userid:" + userid);
if (!StringEx.isNull(userid)) { try { this.session = session; webSockets.add(this); sessionPool.put(userid, session); log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
session.sendMessage(new TextMessage(WebsocketConst.CONNECTION_SUCCESS + "#连接成功")); } catch (Exception e) { } } }
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { if (WebsocketConst.CMD_CHECK.equals(message.getPayload())) { log.debug("发送 pong 数据:" + message.toString()); session.sendMessage(new TextMessage(WebsocketConst.PONG + "#服务器端心跳回复")); } else { log.debug("来自客户端的数据:" + message.toString()); session.sendMessage(new TextMessage(WebsocketConst.INVALID + "#暂不支持该功能")); } }
@Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("handleTransportError:" + exception.getMessage()); }
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { log.debug("connect websocket closed.......");
webSockets.remove(this); log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
String userid = (String) session.getAttributes().get("WEBSOCKET_USERID"); if (!StringEx.isNull(userid)) { sessionPool.remove(userid); } }
public static void sendMsgToAllUsers(WebSocketMessage<?> message) throws Exception { log.info("【websocket消息】广播消息:" + message); try { for (WebsocketHandler webSocket : webSockets) { if (webSocket.session != null && webSocket.session.isOpen()) { webSocket.session.sendMessage(message); } } } catch (Exception e) { e.printStackTrace(); } }
public synchronized static List<String> getAllUsersOnline() { List<String> l = new ArrayList<String>(); for (String s : sessionPool.keySet()) { WebSocketSession session = sessionPool.get(s); if (session != null && session.isOpen()) { l.add(s); } } return l; }
public static boolean isOnline(String userid) { WebSocketSession session = sessionPool.get(userid); if (session != null && session.isOpen()) { return true; } else { return false; } }
public static void callUser(String userid, String confId) { sendMessage(userid, WebsocketConst.CALL+"#"+confId); }
public static void sendMessage(String userid, String msg) { WebSocketSession session = sessionPool.get(userid); if (session != null && session.isOpen()) { try { session.sendMessage(new TextMessage(msg, true)); } catch (IOException e) { e.printStackTrace(); } } } }
|
测试
获取所有在线人员 userid
1 2 3 4 5
| @RequestMapping(value = "/getAllUsers") @ResponseBody public List<String> getAllUsers(HttpServletRequest request, HttpServletResponse response) throws Exception{ return WebsocketHandler.getAllUsersOnline(); }
|
浏览器地址栏输入
http://localhost:8080/scm-web/login/getAllUsers.do