最近王子自己搭建了個項目,項目本身很簡單,但是里面有使用WebSocket進行消息提醒的功能,大體情況是這樣的。 發(fā)布消息者在系統(tǒng)中發(fā)送消息,實時的把消息推送給對應的一個部門下的所有人。 這里面如果是單機應用的情況時,我們可以通過部門的id和用戶的id組成一個唯一的key,與應用服務器建立WebSocket長連接,然后就可以接收到發(fā)布消息者發(fā)送的消息了。 但是真正把項目應用于生產(chǎn)環(huán)境中時,我們是不可能就部署一個單機應用的,而是要部署一個集群。 所以王子通過Nginx+兩臺Tomcat搭建了一個簡單的負載均衡集群,作為測試使用,搭建步驟可以看一下這篇文章:Windows下使用Nginx+Tomcat做負載均衡 但是問題出現(xiàn)了,我們的客戶端瀏覽器只會與一臺服務器建立WebSocket長連接,所以發(fā)布消息者在發(fā)送消息時,就沒法保證所有目標部門的人都能接收到消息(因為這些人連接的可能不是一個服務器)。 本篇文章就是針對于這么一個問題展開討論,提出一種解決方案,當然解決方案不止一種,那我們開始吧。 在介紹分布式集群之前,我們先來看一下王子的WebSocket代碼實現(xiàn),先來看java后端代碼如下: import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{key}") public class WebSocket { private static int onlineCount = 0; /** * 存儲連接的客戶端 */ private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>(); private Session session; /** * 發(fā)送的目標科室code */ private String key;
@OnOpen public void onOpen(@PathParam("key") String key, Session session) throws IOException { this.key = key; this.session = session; if (!clients.containsKey(key)) { addOnlineCount(); } clients.put(key, this); Log.info(key+"已連接消息服務!"); }
@OnClose public void onClose() throws IOException { clients.remove(key); subOnlineCount(); }
@OnMessage public void onMessage(String message) throws IOException { if(message.equals("ping")){ return ; } JSONObject jsonTo = JSON.parseObject(message); String mes = (String) jsonTo.get("message"); if (!jsonTo.get("to").equals("All")){ sendMessageTo(mes, jsonTo.get("to").toString()); }else{ sendMessageAll(mes); } }
@OnError public void onError(Session session, Throwable error) { error.printStackTrace(); }
private void sendMessageTo(String message, String To) throws IOException { for (WebSocket item : clients.values()) { if (item.key.contains(To) ) item.session.getAsyncRemote().sendText(message); } }
private void sendMessageAll(String message) throws IOException { for (WebSocket item : clients.values()) { item.session.getAsyncRemote().sendText(message); } }
public static synchronized int getOnlineCount() { return onlineCount; }
public static synchronized void addOnlineCount() { WebSocket.onlineCount++; }
public static synchronized void subOnlineCount() { WebSocket.onlineCount--; }
public static synchronized Map<String, WebSocket> getClients() { return clients; } }
示例代碼中并沒有使用Spring,用的是原生的java web編寫的,簡單和大家介紹一下里面的方法。 onOpen:在客戶端與WebSocket服務連接時觸發(fā)方法執(zhí)行 onClose:在客戶端與WebSocket連接斷開的時候觸發(fā)執(zhí)行 onMessage:在接收到客戶端發(fā)送的消息時觸發(fā)執(zhí)行 onError:在發(fā)生錯誤時觸發(fā)執(zhí)行 可以看到,在onMessage方法中,我們直接根據(jù)客戶端發(fā)送的消息,進行消息的轉(zhuǎn)發(fā)功能,這樣在單體消息服務中是沒有問題的。 再來看一下js代碼 var host = document.location.host;
// 獲得當前登錄科室 var deptCodes='${sessionScope.$UserContext.departmentID}'; deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, ""); var key = '${sessionScope.$UserContext.userID}'+deptCodes; var lockReconnect = false; //避免ws重復連接 var ws = null; // 判斷當前瀏覽器是否支持WebSocket var wsUrl = 'ws://' + host + '/webSocket/'+ key; createWebSocket(wsUrl); //連接ws
function createWebSocket(url) { try{ if('WebSocket' in window){ ws = new WebSocket(url); }else if('MozWebSocket' in window){ ws = new MozWebSocket(url); }else{ layer.alert("您的瀏覽器不支持websocket協(xié)議,建議使用新版谷歌、火狐等瀏覽器,請勿使用IE10以下瀏覽器,360瀏覽器請使用極速模式,不要使用兼容模式!"); } initEventHandle(); }catch(e){ reconnect(url); console.log(e); } }
function initEventHandle() { ws.onclose = function () { reconnect(wsUrl); console.log("llws連接關閉!"+new Date().toUTCString()); }; ws.onerror = function () { reconnect(wsUrl); console.log("llws連接錯誤!"); }; ws.onopen = function () { heartCheck.reset().start(); //心跳檢測重置 console.log("llws連接成功!"+new Date().toUTCString()); }; ws.onmessage = function (event) { //如果獲取到消息,心跳檢測重置 heartCheck.reset().start(); //拿到任何消息都說明當前連接是正常的//接收到消息實際業(yè)務處理 ... }; } // 監(jiān)聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。 window.onbeforeunload = function() { ws.close(); }
function reconnect(url) { if(lockReconnect) return; lockReconnect = true; setTimeout(function () { //沒連接上會一直重連,設置延遲避免請求過多 createWebSocket(url); lockReconnect = false; }, 2000); }
//心跳檢測 var heartCheck = { timeout: 300000, //5分鐘發(fā)一次心跳 timeoutObj: null, serverTimeoutObj: null, reset: function(){ clearTimeout(this.timeoutObj); clearTimeout(this.serverTimeoutObj); return this; }, start: function(){ var self = this; this.timeoutObj = setTimeout(function(){ //這里發(fā)送一個心跳,后端收到后,返回一個心跳消息, //onmessage拿到返回的心跳就說明連接正常 ws.send("ping"); console.log("ping!") self.serverTimeoutObj = setTimeout(function(){//如果超過一定時間還沒重置,說明后端主動斷開了 ws.close(); //如果onclose會執(zhí)行reconnect,我們執(zhí)行ws.close()就行了.如果直接執(zhí)行reconnect 會觸發(fā)onclose導致重連兩次 }, self.timeout) }, this.timeout) } }
js部分使用的是原生H5編寫的,如果為了更好的兼容瀏覽器,也可以使用SockJS,有興趣小伙伴們可以自行百度。 接下來我們就手動的優(yōu)化代碼,實現(xiàn)WebSocket對分布式架構(gòu)的支持。 現(xiàn)在我們已經(jīng)了解單體應用下的代碼結(jié)構(gòu),也清楚了WebSocket在分布式環(huán)境下面臨的問題,那么是時候思考一下如何能夠解決這個問題了。 我們先來看一看發(fā)生這個問題的根本原因是什么。 簡單思考一下就能明白,單體應用下只有一臺服務器,所有的客戶端連接的都是這一臺消息服務器,所以當發(fā)布消息者發(fā)送消息時,所有的客戶端其實已經(jīng)全部與這臺服務器建立了連接,直接群發(fā)消息就可以了。 換成分布式系統(tǒng)后,假如我們有兩臺消息服務器,那么客戶端通過Nginx負載均衡后,就會有一部分連接到其中一臺服務器,另一部分連接到另一臺服務器,所以發(fā)布消息者發(fā)送消息時,只會發(fā)送到其中的一臺服務器上,而這臺消息服務器就可以執(zhí)行群發(fā)操作,但問題是,另一臺服務器并不知道這件事,也就無法發(fā)送消息了。 現(xiàn)在我們知道了根本原因是生產(chǎn)消息時,只有一臺消息服務器能夠感知到,所以我們只要讓另一臺消息服務器也能感知到就可以了,這樣感知到之后,它就可以群發(fā)消息給連接到它上邊的客戶端了。 那么什么方法可以實現(xiàn)這種功能呢,王子很快想到了引入消息中間件,并使用它的發(fā)布訂閱模式來通知所有消息服務器就可以了。 引入RabbitMQ解決分布式下的WebSocket問題 在消息中間件的選擇上,王子選擇了RabbitMQ,原因是它的搭建比較簡單,功能也很強大,而且我們只是用到它群發(fā)消息的功能。 RabbitMQ有一個廣播模式(fanout),我們使用的就是這種模式。 首先我們寫一個RabbitMQ的連接類: import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RabbitMQUtil { private static Connection connection;
/** * 與rabbitmq建立連接 * @return */ public static Connection getConnection() { if (connection != null&&connection.isOpen()) { return connection; }
ConnectionFactory factory = new ConnectionFactory(); factory.setVirtualHost("/"); factory.setHost("192.168.220.110"); // 用的是虛擬IP地址 factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest");
try { connection = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
return connection; } }
這個類沒什么說的,就是獲取MQ連接的一個工廠類。 然后按照我們的思路,就是每次服務器啟動的時候,都會創(chuàng)建一個MQ的消費者監(jiān)聽MQ的消息,王子這里測試使用的是Servlet的監(jiān)聽器,如下: import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener;
public class InitListener implements ServletContextListener { @Override public void contextInitialized(ServletContextEvent servletContextEvent) { WebSocket.init(); }
@Override public void contextDestroyed(ServletContextEvent servletContextEvent) {
} }
記得要在Web.xml中配置監(jiān)聽器信息 <?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://xmlns./xml/ns/javaee" xmlns:xsi="http://www./2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns./xml/ns/javaee http://xmlns./xml/ns/javaee/web-app_4_0.xsd" version="4.0"> <listener> <listener-class>InitListener</listener-class> </listener> </web-app>
WebSocket中增加init方法,作為MQ消費者部分 public static void init() { try { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); //交換機聲明(參數(shù)為:交換機名稱;交換機類型) channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT); //獲取一個臨時隊列 String queueName = channel.queueDeclare().getQueue(); //隊列與交換機綁定(參數(shù)為:隊列名稱;交換機名稱;routingKey忽略) channel.queueBind(queueName,"fanoutLogs","");
//這里重寫了DefaultConsumer的handleDelivery方法,因為發(fā)送的時候?qū)ο⑦M行了getByte(),在這里要重新組裝成String Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println(message); //這里可以使用WebSocket通過消息內(nèi)容發(fā)送消息給對應的客戶端 } };
//聲明隊列中被消費掉的消息(參數(shù)為:隊列名稱;消息是否自動確認;consumer主體) channel.basicConsume(queueName,true,consumer); //這里不能關閉連接,調(diào)用了消費方法后,消費者會一直連接著rabbitMQ等待消費 } catch (IOException e) { e.printStackTrace(); } }
同時在接收到消息時,不是直接通過WebSocket發(fā)送消息給對應客戶端,而是發(fā)送消息給MQ,這樣如果消息服務器有多個,就都會從MQ中獲得消息,之后通過獲取的消息內(nèi)容再使用WebSocket推送給對應的客戶端就可以了。 WebSocket的onMessage方法增加內(nèi)容如下: try { //嘗試獲取一個連接 Connection connection = RabbitMQUtil.getConnection(); //嘗試創(chuàng)建一個channel Channel channel = connection.createChannel(); //聲明交換機(參數(shù)為:交換機名稱; 交換機類型,廣播模式) channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT); //消息發(fā)布(參數(shù)為:交換機名稱; routingKey,忽略。在廣播模式中,生產(chǎn)者聲明交換機的名稱和類型即可) channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8")); System.out.println("發(fā)布消息"); channel.close(); } catch (IOException |TimeoutException e) { e.printStackTrace(); }
增加后刪除掉原來的Websocket推送部分代碼。 這樣一整套的解決方案就完成了。 到這里,我們就解決了分布式下WebSocket的推送消息問題。 我們主要是引入了RabbitMQ,通過RabbitMQ的發(fā)布訂閱模式,讓每個消息服務器啟動的時候都去訂閱消息,而無論哪臺消息服務器在發(fā)送消息的時候都會發(fā)送給MQ,這樣每臺消息服務器就都會感知到發(fā)送消息的事件,從而再通過Websocket發(fā)送給客戶端。 大體流程就是這樣,那么小伙伴們有沒有想過,如果RabbitMQ掛掉了幾分鐘,之后重啟了,消費者是否可以重新連接到RabbitMQ?是否還能正常接收消息呢? 生產(chǎn)環(huán)境下,這個問題是必須考慮的。 這里王子已經(jīng)測試過,消費者是支持自動重連的,所以我們可以放心的使用這套架構(gòu)來解決此問題。 本文到這里就結(jié)束了,歡迎各位小伙伴留言討論,一起學習,一起進步。
|