一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

抽一根煙的時間學會.NET Core 操作RabbitMQ

 風聲之家 2021-04-03

原創(chuàng) 青城 青城同學 1周前

什么是RabbitMQ?

圖片

RabbitMQ是由erlang語言開發(fā)的一個基于AMQP(Advanced Message Queuing Protocol)協(xié)議的企業(yè)級消息隊列中間件??蓪崿F(xiàn)隊列,訂閱/發(fā)布,路由,通配符等工作模式。

為什么要使用RabbitMQ?

  • 異步處理:比如發(fā)送郵件,發(fā)送短信等不需要等待處理結(jié)果的操作

  • 應用解耦:比如下單成功后,通知倉庫發(fā)貨,不需要等待倉庫回應,通過消息隊列去通知倉庫,降低應用間耦合程序,可并行開發(fā)兩個功能模塊

  • 流量削鋒:在搶購或者其他的活動頁,服務處于爆發(fā)式請求狀態(tài),如果直連數(shù)據(jù)庫,數(shù)據(jù)庫容易被拖垮。搶購商品也容易出現(xiàn)庫存超賣的情況。通過隊列可有效解決該問題。

  • 日志處理:在單機中,日志直接寫入到文件目錄中,但是在分布式應用中,日志需要有統(tǒng)一的處理機制,可通過消息隊列統(tǒng)一由某個消費端做處理。

  • 消息通信:如生產(chǎn)端和消費端可通過隊列進行異步通信

如何安裝RabbitMQ?

Windows端

  1. 安裝erlang語言運行環(huán)境https:///download/otp_win64_23.2.exe下載后直接下一步即可

  2. 安裝RabbitMQhttps://www./install-windows.html#installer直接點擊安裝下一步即可按章

  3. 安裝RabbitMQ的Web管理平臺

RabbitMQ的管理平臺是通過插件的形式使用,需要手動啟用管理平臺在Windows下,RabbitMQ默認被安裝到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。打開sbin ,在cmd或者powershell中執(zhí)行rabbitmq-plugins.bat enable rabbitmq_management

安裝完成后,瀏覽器打開 http://localhost:15672/#/ 即可看到RabbitMQ的管理界面。輸入默認賬號密碼 guest 成功登錄。

Linux環(huán)境安裝

  1. Ubuntu:https://www./install-debian.html

  2. Centos:https://www./install-rpm.html

RabbitMQ的基本概念了解一下?

生產(chǎn)者

發(fā)送消息的端

消費者

獲取消息并處理的端

Connection

一個終端連接。每一個Connection都可以在RabbitMQ后臺看到

Channel

Channel是建立在Connection上的一個虛擬通信管道。一般情況下,往消息隊列中寫入多條消息,為了不每條消息都建立一個TCP連接,所以RabbitMQ的做法是多條消息可以公用一個Connection,大大提高MQ的負載能力。

Exchange

Exchange是一個虛擬交換機。每一條消息都必須要通過交換機才能能進入對應的隊列,可以理解為網(wǎng)絡設(shè)備中的交換機,是一個意思。

Queue

Queue是一個存儲消息的內(nèi)部對象,所有的Rabbit MQ消息都存儲在Queue中。生產(chǎn)者所生產(chǎn)的消息會存儲在Queue中,消費者獲取的消息也是從Queue中獲取。

如何在.NET Core中使用RabbitMQ?

nuget安裝

dotnet add package RabbitMQ.Client

創(chuàng)建生產(chǎn)者

const string QUEUENAME = "HELLO_MQ";
          //創(chuàng)建連接對象工廠
          var factory = new ConnectionFactory()
          {
              UserName = "guest",
              Password = "guest",
              HostName = "localhost",
              Port = 5672, //RabbitMQ默認的端口
          };

          while (true)
          {
              using var conn = factory.CreateConnection();
              var chanel = conn.CreateModel();

              chanel.QueueDeclare(QUEUENAME, true, false, false);
              Console.WriteLine("輸入生產(chǎn)內(nèi)容:");
              var input = Console.ReadLine();
              chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
          }

在循環(huán)中,輸入一個值,按下enter,即可推送一條消息到隊列。

也可以直接在RabbitMQ的管理后臺查看圖片

圖片

圖片

可以看到我們發(fā)送的消息已經(jīng)被RabbitMQ存儲在Queue中了。只等某個幸運的消費者前來消費。

創(chuàng)建消費者

const string QUEUENAME = "HELLO_MQ";
          var factory = new ConnectionFactory()
          {
              UserName = "guest",
              Password = "guest",
              HostName = "localhost",
              Port = 5672,
          };

          var conn = factory.CreateConnection();
          var chanel = conn.CreateModel();
          chanel.QueueDeclare(QUEUENAME, true, false, false);
          EventingBasicConsumer consumer = new EventingBasicConsumer(chanel);
          consumer.Received += (a, e) =>
          {
              Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
              chanel.BasicAck(e.DeliveryTag, true); //收到回復后,RabbitMQ會直接在隊列中刪除這條消息
          };
          chanel.BasicConsume(QUEUENAME, false, consumer);

          Console.WriteLine("啟動成功");
          Console.ReadLine();

啟動成功后,consumer的Received方法,會收到一條來自MQ的消息,圖片

如果處理完成后,不調(diào)用chennel的BasicAck方法,那么這條消息依然會存在,下次有消費者出現(xiàn),會再次推送給消費者。

簡單的RabbitMQ Hello World到這里就算完成了。接下來就是稍微高級一點的應用

RabbitMQ的工作模式

Work Queue 工作隊列模式

圖片

工作隊列模式的意思就是一個生產(chǎn)者對應多個消費者。RabbitMQ會使用輪詢?nèi)ソo每個消費者發(fā)送消息。

publish/subscribe

發(fā)布訂閱模式是屬于用的比較多的一種。

圖片

發(fā)布訂閱,是由交換機發(fā)布消息給多個隊列。多個隊列再對應多個消費者。

發(fā)布訂閱模式對應的交換機類型的fanout。

消費者

A

const string QUEUENAME = "HELLO_MQ_B";
          const string TESTEXCHANGE = "TESTEXCHANGE";
          var factory = new ConnectionFactory()
          {
              UserName = "guest",
              Password = "guest",
              HostName = "localhost",
              Port = 5672,
          };

          var conn = factory.CreateConnection();
          var channel = conn.CreateModel();
          //定義隊列
          channel.QueueDeclare(QUEUENAME, true, false, false);
          //定義交換機
          channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
          //綁定隊列到交換機
          channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
          var consumer = new EventingBasicConsumer(channel);
          consumer.Received += (a, e) =>
          {
              Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
              channel.BasicAck(e.DeliveryTag, true); //收到回復后,RabbitMQ會直接在隊列中刪除這條消息
          };
          channel.BasicConsume(QUEUENAME, false, consumer);

          Console.WriteLine("啟動成功");
          Console.ReadLine();

B

const string QUEUENAME = "HELLO_MQ";
          const string TESTEXCHANGE = "TESTEXCHANGE";
          var factory = new ConnectionFactory()
          {
              UserName = "guest",
              Password = "guest",
              HostName = "localhost",
              Port = 5672,
          };

          var conn = factory.CreateConnection();
          var channel = conn.CreateModel();
          //定義隊列
          channel.QueueDeclare(QUEUENAME, true, false, false);
          //定義交換機
          channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
          //綁定隊列到交換機
          channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
          var consumer = new EventingBasicConsumer(channel);
          consumer.Received += (a, e) =>
          {
              Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
              channel.BasicAck(e.DeliveryTag, true); //收到回復后,RabbitMQ會直接在隊列中刪除這條消息
          };
          channel.BasicConsume(QUEUENAME, false, consumer);

          Console.WriteLine("啟動成功");
          Console.ReadLine();

生產(chǎn)者

const string QUEUENAME = "HELLO_MQ";
          const string QUEUENAME_B = "HELLO_MQ_B";
          const string TESTEXCHANGE = "TESTEXCHANGE";

          //創(chuàng)建連接對象工廠
          var factory = new ConnectionFactory()
          {
              UserName = "guest",
              Password = "guest",
              HostName = "localhost",
              Port = 5672, //RabbitMQ默認的端口
          };
          using var conn = factory.CreateConnection();
          while (true)
          {

              var channel = conn.CreateModel();
              //定義交換機
              channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
              Console.WriteLine("輸入生產(chǎn)內(nèi)容:");
              var input = Console.ReadLine();
              channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
          }

在生產(chǎn)者運行成功后,RabbitMQ后臺會出現(xiàn)一個交換機,點擊交換機會看到交換機下綁定了兩個隊列

圖片圖片

從生產(chǎn)者發(fā)送消息到隊列,兩個消費者會同時收到消息圖片

routing模式

圖片

routing模式對應的交換機類型是direct,和發(fā)布訂閱模式的區(qū)別在于:routing模式下,可以指定一個routingkey,用于區(qū)分消息生產(chǎn)者

  var channel = conn.CreateModel();
              //定義交換機
              channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
              //綁定隊列到交換機
              Console.WriteLine("輸入生產(chǎn)內(nèi)容:");
              var input = Console.ReadLine();
              channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));

消費者 A

//定義隊列
          channel.QueueDeclare(QUEUENAME, true, false, false);
          //定義交換機
          channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
          //綁定隊列到交換機
          channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");

消費者 B

//定義隊列
          channel.QueueDeclare(QUEUENAME, true, false, false);
          //定義交換機
          channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
          //綁定隊列到交換機
          channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");

綁定成功后,發(fā)送消息,消費者A可以收到消息,消費者B無法收到消息。

如果遇到指定routingKey生產(chǎn)一條消息,結(jié)果 AB消費者都收到的情況。建議在RabbitMQ后臺的交換機下看一下綁定的Queue是否重復綁定了多個routingKey.

圖片

topic通配符模式

圖片

在通配符模式下,RabbitMQ使用模糊匹配來決定把消息推送給哪個生產(chǎn)者。通配符有兩個符號來匹配routingKey

  1. *匹配一個字符 如:*.qq.com 可匹配 1.qq.com

  2. #匹配一個或者多個字符。如:*.qq.com 可匹配 1.qq.com或者1111.qq.com

其他的操作基本和routing模式一樣。

header模式

header模式是把routingkey放到header中.取消掉了routingKey。并使用一個字典傳遞 K、V的方式來匹配。比如同時要給用戶發(fā)送郵件和短信,可直接通過header的鍵值對來匹配綁定的值,把消息傳遞給發(fā)短信和郵件的生產(chǎn)者.

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    欧美韩日在线观看一区| 国产高清三级视频在线观看| 亚洲欧美视频欧美视频| 扒开腿狂躁女人爽出白浆av| 91久久精品在这里色伊人| 真实偷拍一区二区免费视频| 欧美日韩国产福利在线观看| 91精品国产综合久久福利| 天堂av一区一区一区| 欧美日韩精品一区二区三区不卡| 亚洲一区二区三区福利视频| 日韩精品视频高清在线观看| 老司机这里只有精品视频| 亚洲淫片一区二区三区| 亚洲最新中文字幕在线视频| 黄色片一区二区三区高清| 午夜精品麻豆视频91| 精品少妇一区二区视频| 国产日本欧美韩国在线| 国产精品一区二区日韩新区| 人妻人妻人人妻人人澡| 午夜福利激情性生活免费视频| 麻豆印象传媒在线观看| 国产精品九九九一区二区| 青青操日老女人的穴穴| 亚洲一区二区三区有码| 日韩毛片视频免费观看| 欧美午夜一级艳片免费看| 国产午夜精品亚洲精品国产| 九九热在线视频观看最新| 国产av天堂一区二区三区粉嫩| 人妻少妇系列中文字幕| 不卡一区二区在线视频| 国产精品超碰在线观看| 国产午夜精品福利免费不| 真实偷拍一区二区免费视频| 日本加勒比不卡二三四区| 大香蕉伊人一区二区三区| 国产一区二区熟女精品免费| 一区二区三区18禁看| 欧美日韩精品综合一区|