一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

聊聊分布式下的WebSocket解決方案

 HUC王子 2020-09-22

01


前言

最近王子自己搭建了個項目,項目本身很簡單,但是里面有使用WebSocket進行消息提醒的功能,大體情況是這樣的。

發(fā)布消息者在系統(tǒng)中發(fā)送消息,實時的把消息推送給對應的一個部門下的所有人。

這里面如果是單機應用的情況時,我們可以通過部門的id和用戶的id組成一個唯一的key,與應用服務器建立WebSocket長連接,然后就可以接收到發(fā)布消息者發(fā)送的消息了。

但是真正把項目應用于生產(chǎn)環(huán)境中時,我們是不可能就部署一個單機應用的,而是要部署一個集群。

所以王子通過Nginx+兩臺Tomcat搭建了一個簡單的負載均衡集群,作為測試使用,搭建步驟可以看一下這篇文章:Windows下使用Nginx+Tomcat做負載均衡

但是問題出現(xiàn)了,我們的客戶端瀏覽器只會與一臺服務器建立WebSocket長連接,所以發(fā)布消息者在發(fā)送消息時,就沒法保證所有目標部門的人都能接收到消息(因為這些人連接的可能不是一個服務器)。

本篇文章就是針對于這么一個問題展開討論,提出一種解決方案,當然解決方案不止一種,那我們開始吧。

02


WebSocket單體應用介紹

在介紹分布式集群之前,我們先來看一下王子的WebSocket代碼實現(xiàn),先來看java后端代碼如下:

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint("/webSocket/{key}")
public class WebSocket {
    private static int onlineCount = 0;
    /**
     * 存儲連接的客戶端
     */

    private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
    private Session session;
    /**
     * 發(fā)送的目標科室code
     */

    private String key;

    @OnOpen
    public void onOpen(@PathParam("key") String key, Session session) throws IOException {
        this.key = key;
        this.session = session;
        if (!clients.containsKey(key)) {
            addOnlineCount();
        }
        clients.put(key, this);
        Log.info(key+"已連接消息服務!");
    }

    @OnClose
    public void onClose() throws IOException {
        clients.remove(key);
        subOnlineCount();
    }

    @OnMessage
    public void onMessage(String message) throws IOException {
        if(message.equals("ping")){
            return ;
        }
        JSONObject jsonTo = JSON.parseObject(message);
        String mes = (String) jsonTo.get("message");
        if (!jsonTo.get("to").equals("All")){
            sendMessageTo(mes, jsonTo.get("to").toString());
        }else{
            sendMessageAll(mes);
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        error.printStackTrace();
    }

    private void sendMessageTo(String message, String To) throws IOException {
        for (WebSocket item : clients.values()) {
            if (item.key.contains(To) )
                item.session.getAsyncRemote().sendText(message);
        }
    }

    private void sendMessageAll(String message) throws IOException {
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }

    public static synchronized Map<String, WebSocket> getClients() {
        return clients;
    }
}

示例代碼中并沒有使用Spring,用的是原生的java web編寫的,簡單和大家介紹一下里面的方法。

onOpen:在客戶端與WebSocket服務連接時觸發(fā)方法執(zhí)行

onClose:在客戶端與WebSocket連接斷開的時候觸發(fā)執(zhí)行

onMessage:在接收到客戶端發(fā)送的消息時觸發(fā)執(zhí)行

onError:在發(fā)生錯誤時觸發(fā)執(zhí)行

可以看到,在onMessage方法中,我們直接根據(jù)客戶端發(fā)送的消息,進行消息的轉(zhuǎn)發(fā)功能,這樣在單體消息服務中是沒有問題的。

再來看一下js代碼

    var host = document.location.host;

    // 獲得當前登錄科室
    var deptCodes='${sessionScope.$UserContext.departmentID}';
    deptCodes=deptCodes.replace(/[\[|\]|\s]+/g"");
    var key = '${sessionScope.$UserContext.userID}'+deptCodes;
    var lockReconnect = false;  //避免ws重復連接
    var ws = null;          // 判斷當前瀏覽器是否支持WebSocket
    var wsUrl = 'ws://' + host + '/webSocket/'+ key;
    createWebSocket(wsUrl);   //連接ws

    function createWebSocket(url{
        try{
            if('WebSocket' in window){
                ws = new WebSocket(url);
            }else if('MozWebSocket' in window){  
                ws = new MozWebSocket(url);
            }else{
                  layer.alert("您的瀏覽器不支持websocket協(xié)議,建議使用新版谷歌、火狐等瀏覽器,請勿使用IE10以下瀏覽器,360瀏覽器請使用極速模式,不要使用兼容模式!"); 
            }
            initEventHandle();
        }catch(e){
            reconnect(url);
            console.log(e);
        }     
    }

    function initEventHandle({
        ws.onclose = function ({
            reconnect(wsUrl);
            console.log("llws連接關閉!"+new Date().toUTCString());
        };
        ws.onerror = function ({
            reconnect(wsUrl);
            console.log("llws連接錯誤!");
        };
        ws.onopen = function ({
            heartCheck.reset().start();      //心跳檢測重置
            console.log("llws連接成功!"+new Date().toUTCString());
        };
        ws.onmessage = function (event{    //如果獲取到消息,心跳檢測重置
            heartCheck.reset().start();      //拿到任何消息都說明當前連接是正常的//接收到消息實際業(yè)務處理
        ...
        };
    }
    // 監(jiān)聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。
    window.onbeforeunload = function({
        ws.close();
    }  

    function reconnect(url{
        if(lockReconnect) return;
        lockReconnect = true;
        setTimeout(function ({     //沒連接上會一直重連,設置延遲避免請求過多
            createWebSocket(url);
            lockReconnect = false;
        }, 2000);
    }

    //心跳檢測
    var heartCheck = {
        timeout300000,        //5分鐘發(fā)一次心跳
        timeoutObj: null,
        serverTimeoutObjnull,
        resetfunction(){
            clearTimeout(this.timeoutObj);
            clearTimeout(this.serverTimeoutObj);
            return this;
        },
        startfunction(){
            var self = this;
            this.timeoutObj = setTimeout(function(){
                //這里發(fā)送一個心跳,后端收到后,返回一個心跳消息,
                //onmessage拿到返回的心跳就說明連接正常
                ws.send("ping");
                console.log("ping!")
                self.serverTimeoutObj = setTimeout(function(){//如果超過一定時間還沒重置,說明后端主動斷開了
                    ws.close();     //如果onclose會執(zhí)行reconnect,我們執(zhí)行ws.close()就行了.如果直接執(zhí)行reconnect 會觸發(fā)onclose導致重連兩次
                }, self.timeout)
            }, this.timeout)
        }
  }

js部分使用的是原生H5編寫的,如果為了更好的兼容瀏覽器,也可以使用SockJS,有興趣小伙伴們可以自行百度。

接下來我們就手動的優(yōu)化代碼,實現(xiàn)WebSocket對分布式架構(gòu)的支持。

03


解決方案的思考

現(xiàn)在我們已經(jīng)了解單體應用下的代碼結(jié)構(gòu),也清楚了WebSocket在分布式環(huán)境下面臨的問題,那么是時候思考一下如何能夠解決這個問題了。

我們先來看一看發(fā)生這個問題的根本原因是什么。

簡單思考一下就能明白,單體應用下只有一臺服務器,所有的客戶端連接的都是這一臺消息服務器,所以當發(fā)布消息者發(fā)送消息時,所有的客戶端其實已經(jīng)全部與這臺服務器建立了連接,直接群發(fā)消息就可以了。

換成分布式系統(tǒng)后,假如我們有兩臺消息服務器,那么客戶端通過Nginx負載均衡后,就會有一部分連接到其中一臺服務器,另一部分連接到另一臺服務器,所以發(fā)布消息者發(fā)送消息時,只會發(fā)送到其中的一臺服務器上,而這臺消息服務器就可以執(zhí)行群發(fā)操作,但問題是,另一臺服務器并不知道這件事,也就無法發(fā)送消息了。

現(xiàn)在我們知道了根本原因是生產(chǎn)消息時,只有一臺消息服務器能夠感知到,所以我們只要讓另一臺消息服務器也能感知到就可以了,這樣感知到之后,它就可以群發(fā)消息給連接到它上邊的客戶端了。

那么什么方法可以實現(xiàn)這種功能呢,王子很快想到了引入消息中間件,并使用它的發(fā)布訂閱模式來通知所有消息服務器就可以了。

04


引入RabbitMQ解決分布式下的WebSocket問題

在消息中間件的選擇上,王子選擇了RabbitMQ,原因是它的搭建比較簡單,功能也很強大,而且我們只是用到它群發(fā)消息的功能。

RabbitMQ有一個廣播模式(fanout),我們使用的就是這種模式。

首先我們寫一個RabbitMQ的連接類:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RabbitMQUtil {
    private static Connection connection;

    /**
     * 與rabbitmq建立連接
     * @return
     */

    public static Connection getConnection() {
        if (connection != null&&connection.isOpen()) {
            return connection;
        }

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("192.168.220.110"); // 用的是虛擬IP地址
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");

        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return connection;
    }
}

這個類沒什么說的,就是獲取MQ連接的一個工廠類。

然后按照我們的思路,就是每次服務器啟動的時候,都會創(chuàng)建一個MQ的消費者監(jiān)聽MQ的消息,王子這里測試使用的是Servlet的監(jiān)聽器,如下:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;


public class InitListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent servletContextEvent) {
        WebSocket.init();
    }

    @Override
    public void contextDestroyed(ServletContextEvent servletContextEvent) {

    }
}

記得要在Web.xml中配置監(jiān)聽器信息

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns./xml/ns/javaee"
         xmlns:xsi="http://www./2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns./xml/ns/javaee http://xmlns./xml/ns/javaee/web-app_4_0.xsd"
         version="4.0">

    <listener>
        <listener-class>InitListener</listener-class>
    </listener>
</web-app>

WebSocket中增加init方法,作為MQ消費者部分

public  static void init() {
        try {
            Connection connection = RabbitMQUtil.getConnection();
            Channel channel = connection.createChannel();
            //交換機聲明(參數(shù)為:交換機名稱;交換機類型)
            channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
            //獲取一個臨時隊列
            String queueName = channel.queueDeclare().getQueue();
            //隊列與交換機綁定(參數(shù)為:隊列名稱;交換機名稱;routingKey忽略)
            channel.queueBind(queueName,"fanoutLogs","");


            //這里重寫了DefaultConsumer的handleDelivery方法,因為發(fā)送的時候?qū)ο⑦M行了getByte(),在這里要重新組裝成String
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String message = new String(body,"UTF-8");
                    System.out.println(message);
            //這里可以使用WebSocket通過消息內(nèi)容發(fā)送消息給對應的客戶端
                }
            };

            //聲明隊列中被消費掉的消息(參數(shù)為:隊列名稱;消息是否自動確認;consumer主體)
            channel.basicConsume(queueName,true,consumer);
            //這里不能關閉連接,調(diào)用了消費方法后,消費者會一直連接著rabbitMQ等待消費
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

同時在接收到消息時,不是直接通過WebSocket發(fā)送消息給對應客戶端,而是發(fā)送消息給MQ,這樣如果消息服務器有多個,就都會從MQ中獲得消息,之后通過獲取的消息內(nèi)容再使用WebSocket推送給對應的客戶端就可以了。

WebSocket的onMessage方法增加內(nèi)容如下:

      try {
            //嘗試獲取一個連接
            Connection connection = RabbitMQUtil.getConnection();
            //嘗試創(chuàng)建一個channel
            Channel channel = connection.createChannel();
            //聲明交換機(參數(shù)為:交換機名稱; 交換機類型,廣播模式)
            channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
            //消息發(fā)布(參數(shù)為:交換機名稱; routingKey,忽略。在廣播模式中,生產(chǎn)者聲明交換機的名稱和類型即可)
            channel.basicPublish("fanoutLogs",""null,msg.getBytes("UTF-8"));
            System.out.println("發(fā)布消息");
            channel.close();
        } catch (IOException |TimeoutException e) {
            e.printStackTrace();
        }

增加后刪除掉原來的Websocket推送部分代碼。

這樣一整套的解決方案就完成了。

05


總結(jié)

到這里,我們就解決了分布式下WebSocket的推送消息問題。

我們主要是引入了RabbitMQ,通過RabbitMQ的發(fā)布訂閱模式,讓每個消息服務器啟動的時候都去訂閱消息,而無論哪臺消息服務器在發(fā)送消息的時候都會發(fā)送給MQ,這樣每臺消息服務器就都會感知到發(fā)送消息的事件,從而再通過Websocket發(fā)送給客戶端。

大體流程就是這樣,那么小伙伴們有沒有想過,如果RabbitMQ掛掉了幾分鐘,之后重啟了,消費者是否可以重新連接到RabbitMQ?是否還能正常接收消息呢?

生產(chǎn)環(huán)境下,這個問題是必須考慮的。

這里王子已經(jīng)測試過,消費者是支持自動重連的,所以我們可以放心的使用這套架構(gòu)來解決此問題。

本文到這里就結(jié)束了,歡迎各位小伙伴留言討論,一起學習,一起進步。

    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    日韩精品在线观看一区| 色婷婷亚洲精品综合网| 国产精品一区二区视频| 亚洲人午夜精品射精日韩 | 国产农村妇女成人精品| 欧美亚洲美女资源国产| 91欧美日韩中在线视频| 欧美黑人在线精品极品| 99精品国产一区二区青青| 日韩aa一区二区三区| 深夜日本福利在线观看| 一区中文字幕人妻少妇| 极品熟女一区二区三区| 不卡一区二区在线视频| 国产永久免费高清在线精品| av中文字幕一区二区三区在线| 色婷婷久久五月中文字幕| 亚洲五月婷婷中文字幕| 中文字日产幕码三区国产| 欧美国产日韩在线综合| 五月激情综合在线视频| 男女激情视频在线免费观看| 久一视频这里只有精品| 亚洲精品国产精品日韩| 欧美日韩久久精品一区二区| 国产日本欧美韩国在线| 午夜福利92在线观看| 黄色片一区二区在线观看| 日韩成人h视频在线观看| 欧美三级大黄片免费看| 日韩精品毛片视频免费看| 午夜资源在线观看免费高清| 国产成人在线一区二区三区| 免费观看潮喷到高潮大叫| 久草视频在线视频在线观看| 国产av乱了乱了一区二区三区| 国产传媒精品视频一区| 欧美激情床戏一区二区三| 亚洲精品国产福利在线| 国产剧情欧美日韩中文在线| 夫妻性生活一级黄色录像|