一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

分布式WebSocket集群解決方案

 雪山紅柳 2020-01-10

問題起因

最近做項目時遇到了需要多用戶之間通信的問題,涉及到了WebSocket握手請求,以及集群中WebSocket Session共享的問題。

期間我經(jīng)過了幾天的研究,總結(jié)出了幾個實現(xiàn)分布式WebSocket集群的辦法,從zuul到spring cloud gateway的不同嘗試,總結(jié)出了這篇文章,希望能幫助到某些人,并且能一起分享這方面的想法與研究。

以下是我的場景描述

  • 資源:4臺服務(wù)器。其中只有一臺服務(wù)器具備ssl認證域名,一臺redis+mysql服務(wù)器,兩臺應(yīng)用服務(wù)器(集群)
  • 應(yīng)用發(fā)布限制條件:由于場景需要,應(yīng)用場所需要ssl認證的域名才能發(fā)布。因此ssl認證的域名服務(wù)器用來當(dāng)api網(wǎng)關(guān),負責(zé)https請求與wss(安全認證的ws)連接。俗稱https卸載,用戶請求https域名服務(wù)器(eg:https:///xxx),但真實訪問到的是http+ip地址的形式。只要網(wǎng)關(guān)配置高,能handle多個應(yīng)用
  • 需求:用戶登錄應(yīng)用,需要與服務(wù)器建立wss連接,不同角色之間可以單發(fā)消息,也可以群發(fā)消息
  • 集群中的應(yīng)用服務(wù)類型:每個集群實例都負責(zé)http無狀態(tài)請求服務(wù)與ws長連接服務(wù)

系統(tǒng)架構(gòu)圖

clipboard.png

在我的實現(xiàn)里,每個應(yīng)用服務(wù)器都負責(zé)http and ws請求,其實也可以將ws請求建立的聊天模型單獨成立為一個模塊。從分布式的角度來看,這兩種實現(xiàn)類型差不多,但從實現(xiàn)方便性來說,一個應(yīng)用服務(wù)http+ws請求的方式更為方便。下文會有解釋

本文涉及的技術(shù)棧

  • Eureka 服務(wù)發(fā)現(xiàn)與注冊
  • Redis Session共享
  • Redis 消息訂閱
  • Spring Boot
  • Zuul 網(wǎng)關(guān)
  • Spring Cloud Gateway 網(wǎng)關(guān)
  • Spring WebSocket 處理長連接
  • Ribbon 負載均衡
  • Netty 多協(xié)議NIO網(wǎng)絡(luò)通信框架
  • Consistent Hash 一致性哈希算法

相信能走到這一步的人都了解過我上面列舉的技術(shù)棧了,如果還沒有,可以先去網(wǎng)上找找入門教程了解一下。下面的內(nèi)容都與上述技術(shù)相關(guān),題主默認大家都了解過了...
這里是描述一致性Hash算法最易懂的文章傳送門

技術(shù)可行性分析

下面我將描述session特性,以及根據(jù)這些特性列舉出n個解決分布式架構(gòu)中處理ws請求的集群方案

WebSocketSession與HttpSession
在Spring所集成的WebSocket里面,每個ws連接都有一個對應(yīng)的session:WebSocketSession,在Spring WebSocket中,我們建立ws連接之后可以通過類似這樣的方式進行與客戶端的通信:

protected void handleTextMessage(WebSocketSession session, TextMessage message) {
   System.out.println("服務(wù)器接收到的消息: "+ message );
   //send message to client
   session.sendMessage(new TextMessage("message"));
}

那么問題來了:ws的session無法序列化到redis,因此在集群中,我們無法將所有WebSocketSession都緩存到redis進行session共享。每臺服務(wù)器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前沒有websocket session共享的方案,因此走redis websocket session共享這條路是行不通的。
有的人可能會想:我可不可以將sessin關(guān)鍵信息緩存到redis,集群中的服務(wù)器從redis拿取session關(guān)鍵信息然后重新構(gòu)建websocket session...我只想說這種方法如果有人能試出來,請告訴我一聲...

以上便是websocket session與http session共享的區(qū)別,總的來說就是http session共享已經(jīng)有解決方案了,而且很簡單,只要引入相關(guān)依賴:spring-session-data-redisspring-boot-starter-redis,大家可以從網(wǎng)上找個demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底層實現(xiàn)的方式,我們無法做到真正的websocket session共享。

解決方案的演變

Netty與Spring WebSocket

剛開始的時候,我嘗試著用netty實現(xiàn)了websocket服務(wù)端的搭建。在netty里面,并沒有websocket session這樣的概念,與其類似的是channel,每一個客戶端連接都代表一個channel。前端的ws請求通過netty監(jiān)聽的端口,走websocket協(xié)議進行ws握手連接之后,通過一些列的handler(責(zé)鏈模式)進行消息處理。與websocket session類似地,服務(wù)端在連接建立后有一個channel,我們可以通過channel進行與客戶端的通信

   /**
    * TODO 根據(jù)服務(wù)器傳進來的id,分配到不同的group
    */
   private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

   @Override
   protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
       //retain增加引用計數(shù),防止接下來的調(diào)用引用失效
       System.out.println("服務(wù)器接收到來自 " + ctx.channel().id() + " 的消息: " + msg.text());
       //將消息發(fā)送給group里面的所有channel,也就是發(fā)送消息給客戶端
       GROUP.writeAndFlush(msg.retain());
   }

那么,服務(wù)端用netty還是用spring websocket?以下我將從幾個方面列舉這兩種實現(xiàn)方式的優(yōu)缺點

  • 使用netty實現(xiàn)websocket

    玩過netty的人都知道netty是的線程模型是nio模型,并發(fā)量非常高,spring5之前的網(wǎng)絡(luò)線程模型是servlet實現(xiàn)的,而servlet不是nio模型,所以在spring5之后,spring的底層網(wǎng)絡(luò)實現(xiàn)采用了netty。如果我們單獨使用netty來開發(fā)websocket服務(wù)端,速度快是絕對的,但是可能會遇到下列問題:
    1.與系統(tǒng)的其他應(yīng)用集成不方便,在rpc調(diào)用的時候,無法享受springcloud里feign服務(wù)調(diào)用的便利性
    2.業(yè)務(wù)邏輯可能要重復(fù)實現(xiàn)
    3.使用netty可能需要重復(fù)造輪子
    4.怎么連接上服務(wù)注冊中心,也是一件麻煩的事情
    5.restful服務(wù)與ws服務(wù)需要分開實現(xiàn),如果在netty上實現(xiàn)restful服務(wù),有多麻煩可想而知,用spring一站式restful開發(fā)相信很多人都習(xí)慣了。

  • 使用spring websocket實現(xiàn)ws服務(wù)

    spring websocket已經(jīng)被springboot很好地集成了,所以在springboot上開發(fā)ws服務(wù)非常方便,做法非常簡單
    第一步:添加依賴

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    第二步:添加配置類

    @Configuration
    public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/")
            .setAllowedOrigins("*");
    }
    
    @Bean
     public WebSocketHandler myHandler() {
         return new MessageHandler();
     }
    }

    第三步:實現(xiàn)消息監(jiān)聽類

    @Component
    @SuppressWarnings("unchecked")
    public class MessageHandler extends TextWebSocketHandler {
       private List<WebSocketSession> clients = new ArrayList<>();
    
       @Override
       public void afterConnectionEstablished(WebSocketSession session) {
           clients.add(session);
           System.out.println("uri :" + session.getUri());
           System.out.println("連接建立: " + session.getId());
           System.out.println("current seesion: " + clients.size());
       }
    
       @Override
       public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
           clients.remove(session);
           System.out.println("斷開連接: " + session.getId());
       }
    
       @Override
       protected void handleTextMessage(WebSocketSession session, TextMessage message) {
           String payload = message.getPayload();
           Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);
           System.out.println("接受到的數(shù)據(jù)" + map);
           clients.forEach(s -> {
               try {
                   System.out.println("發(fā)送消息給: " + session.getId());
                   s.sendMessage(new TextMessage("服務(wù)器返回收到的信息," + payload));
               } catch (Exception e) {
                   e.printStackTrace();
               }
           });
       }
    }

    從這個demo中,使用spring websocket實現(xiàn)ws服務(wù)的便利性大家可想而知了。為了能更好地向spring cloud大家族看齊,我最終采用了spring websocket實現(xiàn)ws服務(wù)。
    因此我的應(yīng)用服務(wù)架構(gòu)是這樣子的:一個應(yīng)用既負責(zé)restful服務(wù),也負責(zé)ws服務(wù)。沒有將ws服務(wù)模塊拆分是因為拆分出去要使用feign來進行服務(wù)調(diào)用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務(wù)間的io調(diào)用,所以就沒有這么做了。

從zuul技術(shù)轉(zhuǎn)型到spring cloud gateway

要實現(xiàn)websocket集群,我們必不可免地得從zuul轉(zhuǎn)型到spring cloud gateway。原因如下:

zuul1.0版本不支持websocket轉(zhuǎn)發(fā),zuul 2.0開始支持websocket,zuul2.0幾個月前開源了,但是2.0版本沒有被spring boot集成,而且文檔不健全。因此轉(zhuǎn)型是必須的,同時轉(zhuǎn)型也很容易實現(xiàn)。

在gateway中,為了實現(xiàn)ssl認證和動態(tài)路由負載均衡,yml文件中以下的某些配置是必須的,在這里提前避免大家采坑
server:
  port: 443
  ssl:
    enabled: true
    key-store: classpath:xxx.jks
    key-store-password: xxxx
    key-store-type: JKS
    key-alias: alias
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      httpclient:
        ssl:
          handshake-timeout-millis: 10000
          close-notify-flush-timeout-millis: 3000
          close-notify-read-timeout-millis: 0
          useInsecureTrustManager: true
      discovery:
        locator:
          enabled: true
          lower-case-service-id: true
      routes:
      - id: dc
        uri: lb://dc
        predicates:
        - Path=/dc/**
      - id: wecheck
        uri: lb://wecheck
        predicates:
        - Path=/wecheck/**

如果要愉快地玩https卸載,我們還需要配置一個filter,否則請求網(wǎng)關(guān)時會出現(xiàn)錯誤not an SSL/TLS record

@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {
  private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      URI originalUri = exchange.getRequest().getURI();
      ServerHttpRequest request = exchange.getRequest();
      ServerHttpRequest.Builder mutate = request.mutate();
      String forwardedUri = request.getURI().toString();
      if (forwardedUri != null && forwardedUri.startsWith("https")) {
          try {
              URI mutatedUri = new URI("http",
                      originalUri.getUserInfo(),
                      originalUri.getHost(),
                      originalUri.getPort(),
                      originalUri.getPath(),
                      originalUri.getQuery(),
                      originalUri.getFragment());
              mutate.uri(mutatedUri);
          } catch (Exception e) {
              throw new IllegalStateException(e.getMessage(), e);
          }
      }
      ServerHttpRequest build = mutate.build();
      ServerWebExchange webExchange = exchange.mutate().request(build).build();
      return chain.filter(webExchange);
  }

  @Override
  public int getOrder() {
      return HTTPS_TO_HTTP_FILTER_ORDER;
  }

}

這樣子我們就可以使用gateway來卸載https請求了,到目前為止,我們的基本框架已經(jīng)搭建完畢,網(wǎng)關(guān)既可以轉(zhuǎn)發(fā)https請求,也可以轉(zhuǎn)發(fā)wss請求。接下來就是用戶多對多之間session互通的通訊解決方案了。接下來,我將根據(jù)方案的優(yōu)雅性,從最不優(yōu)雅的方案開始講起。

session廣播

這是最簡單的websocket集群通訊解決方案。場景如下:
教師A想要群發(fā)消息給他的學(xué)生們

  1. 教師的消息請求發(fā)給網(wǎng)關(guān),內(nèi)容包含{我是教師A,我想把xxx消息發(fā)送我的學(xué)生們}
  2. 網(wǎng)關(guān)接收到消息,獲取集群所有ip地址,逐個調(diào)用教師的請求
  3. 集群中的每臺服務(wù)器獲取請求,根據(jù)教師A的信息查找本地有沒有與學(xué)生關(guān)聯(lián)的session,有則調(diào)用sendMessage方法,沒有則忽略請求

clipboard.png

session廣播實現(xiàn)很簡單,但是有一個致命缺陷:計算力浪費現(xiàn)象,當(dāng)服務(wù)器沒有消息接收者session的時候,相當(dāng)于浪費了一次循環(huán)遍歷的計算力,該方案在并發(fā)需求不高的情況下可以優(yōu)先考慮,實現(xiàn)很容易。

spring cloud中獲取服務(wù)集群中每臺服務(wù)器信息的方法如下
@Resource
private EurekaClient eurekaClient;

Application app = eurekaClient.getApplication("service-name");
//instanceInfo包括了一臺服務(wù)器ip,port等消息
InstanceInfo instanceInfo = app.getInstances().get(0);
System.out.println("ip address: " + instanceInfo.getIPAddr());
服務(wù)器需要維護關(guān)系映射表,將用戶的id與session做映射,session建立時在映射表中添加映射關(guān)系,session斷開后要刪除映射表內(nèi)關(guān)聯(lián)關(guān)系

一致性哈希算法實現(xiàn)(本文的要點)

這種方法是本人認為最優(yōu)雅的實現(xiàn)方案,理解這種方案需要一定的時間,如果你耐心看下去,相信你一定會有所收獲。再強調(diào)一次,不了解一致性哈希算法的同學(xué)請先看這里,現(xiàn)先假設(shè)哈希環(huán)是順時針查找的。

首先,想要將一致性哈希算法的思想應(yīng)用到我們的websocket集群,我們需要解決以下新問題:

  1. 集群節(jié)點DOWN,會影響到哈希環(huán)映射到狀態(tài)是DOWN的節(jié)點。
  2. 集群節(jié)點UP,會影響到舊key映射不到對應(yīng)的節(jié)點。
  3. 哈希環(huán)讀寫共享。
在集群中,總會出現(xiàn)服務(wù)UP/DOWN的問題。

針對節(jié)點DOWN的問題分析如下:

一個服務(wù)器DOWN的時候,其擁有的websocket session會自動關(guān)閉連接,并且前端會收到通知。此時會影響到哈希環(huán)的映射錯誤。我們只需要當(dāng)監(jiān)聽到服務(wù)器DOWN的時候,刪除哈希環(huán)上面對應(yīng)的實際結(jié)點和虛結(jié)點,避免讓網(wǎng)關(guān)轉(zhuǎn)發(fā)到狀態(tài)是DOWN的服務(wù)器上。
實現(xiàn)方法:在eureka治理中心監(jiān)聽集群服務(wù)DOWN事件,并及時更新哈希環(huán)。

針對節(jié)點UP的問題分析如下:

現(xiàn)假設(shè)集群中有服務(wù)CacheB上線了,該服務(wù)器的ip地址剛好被映射到key1和cacheA之間。那么key1對應(yīng)的用戶每次要發(fā)消息時都跑去CacheB發(fā)送消息,結(jié)果明顯是發(fā)送不了消息,因為CacheB沒有key1對應(yīng)的session。

clipboard.png

此時我們有兩種解決方案。
方案A簡單,動作大:
eureka監(jiān)聽到節(jié)點UP事件之后,根據(jù)現(xiàn)有集群信息,更新哈希環(huán)。并且斷開所有session連接,讓客戶端重新連接,此時客戶端會連接到更新后的哈希環(huán)節(jié)點,以此避免消息無法送達的情況。
方案B復(fù)雜,動作小:
我們先看看沒有虛擬節(jié)點的情況,假設(shè)CacheCCacheA之間上線了服務(wù)器CacheB。所有映射在CacheCCacheB的用戶發(fā)消息時都會去CacheB里面找session發(fā)消息。也就是說CacheB一但上線,便會影響到CacheCCacheB之間的用戶發(fā)送消息。所以我們只需要將CacheA斷開CacheCCacheB的用戶所對應(yīng)的session,讓客戶端重連。

clipboard.png

接下來是有虛擬節(jié)點的情況,假設(shè)淺色的節(jié)點是虛擬節(jié)點。我們用長括號來代表某段區(qū)域映射的結(jié)果屬于某個Cache。首先是C節(jié)點未上線的情況。圖大家應(yīng)該都懂吧,所有B的虛擬節(jié)點都會指向真實的B節(jié)點,所以所有B節(jié)點逆時針那一部分都會映射到B(因為我們規(guī)定哈希環(huán)順時針查找)。

clipboard.png

接下來是C節(jié)點上線的情況,可以看到某些區(qū)域被C占領(lǐng)了。

clipboard.png

由以上情況我們可以知道:節(jié)點上線,會有許多對應(yīng)虛擬節(jié)點也同時上線,因此我們需要將多段范圍key對應(yīng)的session斷開連接(上圖紅色的部分)。具體算法有點復(fù)雜,實現(xiàn)的方式因人而異,大家可以嘗試一下自己實現(xiàn)算法。

哈希環(huán)應(yīng)該放在哪里?

  1. gateway本地創(chuàng)建并維護哈希環(huán)。當(dāng)ws請求進來的時候,本地獲取哈希環(huán)并獲取映射服務(wù)器信息,轉(zhuǎn)發(fā)ws請求。這種方法看上去不錯,但實際上是不太可取的,回想一下上面服務(wù)器DOWN的時候只能通過eureka監(jiān)聽,那么eureka監(jiān)聽到DOWN事件之后,需要通過io來通知gateway刪除對應(yīng)節(jié)點嗎?顯然太麻煩了,將eureka的職責(zé)分散到gateway,不建議這么做。
  2. eureka創(chuàng)建,并放到redis共享讀寫。這個方案可行,當(dāng)eureka監(jiān)聽到服務(wù)DOWN的時候,修改哈希環(huán)并推送到redis上。為了請求響應(yīng)時間盡量地短,我們不可以讓gateway每次轉(zhuǎn)發(fā)ws請求的時候都去redis取一次哈希環(huán)。哈希環(huán)修改的概率的確很低,gateway只需要應(yīng)用redis的消息訂閱模式,訂閱哈希環(huán)修改事件便可以解決此問題。
至此我們的spring websocket集群已經(jīng)搭建的差不多了,最重要的地方還是一致性哈希算法。現(xiàn)在有最后一個技術(shù)瓶頸,網(wǎng)關(guān)如何根據(jù)ws請求轉(zhuǎn)發(fā)到指定的集群服務(wù)器上?答案在負載均衡。spring cloud gateway或zuul都默認集成了ribbon作為負載均衡,我們只需要根據(jù)建立ws請求時客戶端發(fā)來的user id,重寫ribbon負載均衡算法,根據(jù)user id進行hash,并在哈希環(huán)上尋找ip,并將ws請求轉(zhuǎn)發(fā)到該ip便完事了。流程如下圖所示:

clipboard.png

接下來用戶溝通的時候,只需要根據(jù)id進行hash,在哈希環(huán)上獲取對應(yīng)ip,便可以知道與該用戶建立ws連接時的session存在哪臺服務(wù)器上了!

spring cloud Finchley.RELEASE 版本中ribbon未完善的地方

題主在實際操作的時候發(fā)現(xiàn)了ribbon兩個不完善的地方......

  1. 根據(jù)網(wǎng)上找的方法,繼承AbstractLoadBalancerRule重寫負載均衡策略之后,多個不同應(yīng)用的請求變得混亂。假如eureka上有兩個service A和B,重寫負載均衡策略之后,請求A或B的服務(wù),最終只會映射到其中一個服務(wù)上。非常奇怪!可能spring cloud gateway官網(wǎng)需要給出一個正確的重寫負載均衡策略的demo。
  2. 一致性哈希算法需要一個key,類似user id,根據(jù)key進行hash之后在哈希環(huán)上搜索并返回ip。但是ribbon沒有完善choose函數(shù)的key參數(shù),直接寫死了default!

clipboard.png

難道這樣子我們就沒有辦法了嗎?其實還有一個可行并且暫時可替代的辦法!
如下圖所示,客戶端發(fā)送一個普通的http請求(包含id參數(shù))給網(wǎng)關(guān),網(wǎng)關(guān)根據(jù)id進行hash,在哈希環(huán)中尋找ip地址,將ip地址返回給客戶端,客戶端再根據(jù)該ip地址進行ws請求。

clipboard.png

由于ribbon未完善key的處理,我們暫時無法在ribbon上實現(xiàn)一致性哈希算法。只能間接地通過客戶端發(fā)起兩次請求(一次http,一次ws)的方式來實現(xiàn)一致性哈希。希望不久之后ribbon能更新這個缺陷!讓我們的websocket集群實現(xiàn)得更優(yōu)雅一點。

后記

以上便是我這幾天探索的結(jié)果。期間遇到了許多問題,并逐一解決難題,列出兩個websocket集群解決方案。第一個是session廣播,第二個是一致性哈希。這兩種方案針對不同場景各有優(yōu)缺點,本文并未用到ActiveMQ,Karfa等消息隊列實現(xiàn)消息推送,只是想通過自己的想法,不依靠消息隊列來簡單地實現(xiàn)多用戶之間的長連接通訊。希望能為大家提供一條不同于尋常的思路。

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    欧美日韩乱码一区二区三区| 国产一区麻豆水好多高潮| 久久黄片免费播放大全 | 日本少妇三级三级三级| 日本不卡视频在线观看| 国产水滴盗摄一区二区| 少妇激情在线免费观看| 免费一区二区三区少妇| 少妇一区二区三区精品| 国产亚洲精品一二三区| 久久精品福利在线观看| 亚洲国产成人精品福利| 麻豆国产精品一区二区三区| 国产真人无遮挡免费视频一区| 亚洲国产成人一区二区在线观看| 亚洲中文在线中文字幕91| 精品人妻少妇二区三区| 国产精品超碰在线观看| 一区二区三区在线不卡免费| 黄片免费在线观看日韩| 加勒比日本欧美在线观看| 欧美黄色黑人一区二区| 日本东京热加勒比一区二区| 欧美日韩国产精品自在自线| 亚洲av日韩av高潮无打码| 日韩精品一区二区毛片| 日韩精品视频一二三区| 中文字幕熟女人妻视频| 欧美性猛交内射老熟妇| 99久久精品国产麻豆| 亚洲精品国产美女久久久99| 国产精品福利精品福利| 欧美又大又黄刺激视频| 亚洲一区二区久久观看| 人妻偷人精品一区二区三区不卡| 亚洲午夜av一区二区| 午夜精品国产一区在线观看| 亚洲精品一二三区不卡| 亚洲专区中文字幕视频| 欧美日韩国产精品第五页| 亚洲免费观看一区二区三区|