本文對(duì)rabbitmq基礎(chǔ)介紹,完全是為了下一篇rabbitmq性能測(cè)試做準(zhǔn)備,讓讀者去了解我們需要測(cè)試的是什么樣一個(gè)“東西”。 引言你是否遇到過(guò)兩個(gè)(多個(gè))系統(tǒng)間需要通過(guò)定時(shí)任務(wù)來(lái)同步某些數(shù)據(jù)?你是否在為異構(gòu)系統(tǒng)的不同進(jìn)程間相互調(diào)用、通訊的問(wèn)題而苦惱、掙扎?如果是,那么恭喜你,消息服務(wù)讓你可以很輕松地解決這些問(wèn)題。 RabbitMQ簡(jiǎn)介AMQP,即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然。 ConnectionFactory、Connection、ChannelConnectionFactory、Connection、Channel都是RabbitMQ對(duì)外提供的API中最基本的對(duì)象。Connection是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯。ConnectionFactory為Connection的制造工廠。 QueueQueue(隊(duì)列)是RabbitMQ的內(nèi)部對(duì)象,用于存儲(chǔ)消息,用下圖表示。 RabbitMQ中的消息都只能存儲(chǔ)在Queue中,生產(chǎn)者(下圖中的P)生產(chǎn)消息并最終投遞到Queue中,消費(fèi)者(下圖中的C)可以從Queue中獲取消息并消費(fèi)。 多個(gè)消費(fèi)者可以訂閱同一個(gè)Queue,這時(shí)Queue中的消息會(huì)被平均分?jǐn)偨o多個(gè)消費(fèi)者進(jìn)行處理,而不是每個(gè)消費(fèi)者都收到所有的消息并處理。 Message acknowledgment在實(shí)際應(yīng)用中,可能會(huì)發(fā)生消費(fèi)者收到Queue中的消息,但沒(méi)有處理完成就宕機(jī)(或出現(xiàn)其他意外)的情況,這種情況下就可能會(huì)導(dǎo)致消息丟失。為了避免這種情況發(fā)生,我們可以要求消費(fèi)者在消費(fèi)完消息后發(fā)送一個(gè)回執(zhí)給RabbitMQ,RabbitMQ收到消息回執(zhí)(Message acknowledgment)后才將該消息從Queue中移除;如果RabbitMQ沒(méi)有收到回執(zhí)并檢測(cè)到消費(fèi)者的RabbitMQ連接斷開(kāi),則RabbitMQ會(huì)將該消息發(fā)送給其他消費(fèi)者(如果存在多個(gè)消費(fèi)者)進(jìn)行處理。這里不存在timeout概念,一個(gè)消費(fèi)者處理消息時(shí)間再長(zhǎng)也不會(huì)導(dǎo)致該消息被發(fā)送給其他消費(fèi)者,除非它的RabbitMQ連接斷開(kāi)。 Message durability如果我們希望即使在RabbitMQ服務(wù)重啟的情況下,也不會(huì)丟失消息,我們可以將Queue與Message都設(shè)置為可持久化的(durable),這樣可以保證絕大部分情況下我們的RabbitMQ消息不會(huì)丟失。但依然解決不了小概率丟失事件的發(fā)生(比如RabbitMQ服務(wù)器已經(jīng)接收到生產(chǎn)者的消息,但還沒(méi)來(lái)得及持久化該消息時(shí)RabbitMQ服務(wù)器就斷電了),如果我們需要對(duì)這種小概率事件也要管理起來(lái),那么我們要用到事務(wù)。由于這里僅為RabbitMQ的簡(jiǎn)單介紹,所以這里將不講解RabbitMQ相關(guān)的事務(wù)。 Prefetch count前面我們講到如果有多個(gè)消費(fèi)者同時(shí)訂閱同一個(gè)Queue中的消息,Queue中的消息會(huì)被平攤給多個(gè)消費(fèi)者。這時(shí)如果每個(gè)消息的處理時(shí)間不同,就有可能會(huì)導(dǎo)致某些消費(fèi)者一直在忙,而另外一些消費(fèi)者很快就處理完手頭工作并一直空閑的情況。我們可以通過(guò)設(shè)置prefetchCount來(lái)限制Queue每次發(fā)送給每個(gè)消費(fèi)者的消息數(shù),比如我們?cè)O(shè)置prefetchCount=1,則Queue每次給每個(gè)消費(fèi)者發(fā)送一條消息;消費(fèi)者處理完這條消息后Queue會(huì)再給該消費(fèi)者發(fā)送一條消息。 Exchange在上一節(jié)我們看到生產(chǎn)者將消息投遞到Queue中,實(shí)際上這在RabbitMQ中這種事情永遠(yuǎn)都不會(huì)發(fā)生。實(shí)際的情況是,生產(chǎn)者將消息發(fā)送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)。 Exchange是按照什么邏輯將消息路由到Queue的?這個(gè)將在Binding一節(jié)介紹。 routing key生產(chǎn)者在將消息發(fā)送給Exchange的時(shí)候,一般會(huì)指定一個(gè)routing key,來(lái)指定這個(gè)消息的路由規(guī)則,而這個(gè)routing key需要與Exchange Type及binding key聯(lián)合使用才能最終生效。 BindingRabbitMQ中通過(guò)Binding將Exchange與Queue關(guān)聯(lián)起來(lái),這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。 Binding key在綁定(Binding)Exchange與Queue的同時(shí),一般會(huì)指定一個(gè)binding key;消費(fèi)者將消息發(fā)送給Exchange時(shí),一般會(huì)指定一個(gè)routing key;當(dāng)binding key與routing key相匹配時(shí),消息將會(huì)被路由到對(duì)應(yīng)的Queue中。這個(gè)將在Exchange Types章節(jié)會(huì)列舉實(shí)際的例子加以說(shuō)明。 Exchange TypesRabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規(guī)范里還提到兩種Exchange Type,分別為system與自定義,這里不予以描述),下面分別進(jìn)行介紹。 fanoutfanout類(lèi)型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中。 上圖中,生產(chǎn)者(P)發(fā)送到Exchange(X)的所有消息都會(huì)路由到圖中的兩個(gè)Queue,并最終被兩個(gè)消費(fèi)者(C1與C2)消費(fèi)。 directdirect類(lèi)型的Exchange路由規(guī)則也很簡(jiǎn)單,它會(huì)把消息路由到那些binding key與routing key完全匹配的Queue中。 以上圖的配置為例,我們以routingKey=”error”發(fā)送消息到Exchange,則消息會(huì)路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動(dòng)生成的Queue名稱(chēng))和Queue2(amqp.gen-Agl…);如果我們以routingKey=”info”或routingKey=”warning”來(lái)發(fā)送消息,則消息只會(huì)路由到Queue2。如果我們以其他routingKey發(fā)送消息,則消息不會(huì)路由到這兩個(gè)Queue中。 topic前面講到direct類(lèi)型的Exchange路由規(guī)則是完全匹配binding key與routing key,但這種嚴(yán)格的匹配方式在很多情況下不能滿(mǎn)足實(shí)際業(yè)務(wù)需求。topic類(lèi)型的Exchange在匹配規(guī)則上進(jìn)行了擴(kuò)展,它與direct類(lèi)型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規(guī)則有些不同,它約定:
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會(huì)同時(shí)路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會(huì)路由到Q1,routingKey=”lazy.brown.fox”的消息會(huì)路由到Q2,routingKey=”lazy.pink.rabbit”的消息會(huì)路由到Q2(只會(huì)投遞給Q2一次,雖然這個(gè)routingKey與Q2的兩個(gè)bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會(huì)被丟棄,因?yàn)樗鼈儧](méi)有匹配任何bindingKey。 headersheaders類(lèi)型的Exchange不依賴(lài)于routing key與binding key的匹配規(guī)則來(lái)路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。 RPCMQ本身是基于異步的消息處理,前面的示例中所有的生產(chǎn)者(P)將消息發(fā)送到RabbitMQ后不會(huì)知道消費(fèi)者(C)處理成功或者失敗(甚至連有沒(méi)有消費(fèi)者來(lái)處理這條消息都不知道)。
總結(jié)本文介紹了RabbitMQ中個(gè)人認(rèn)為最重要的概念,充分利用RabbitMQ提供的這些功能就可以處理我們絕大部分的異步業(yè)務(wù)了。 http://rabbitmq-into-chinese./zh_CN/latest/tutorials_with_python/[6]RPC/#_5 http://blog.csdn.net/anzhsoft/article/details/19563091 一個(gè)客戶(hù)端可以和多個(gè)exchange相連 consumer指定的時(shí)候只要指定QueueName就OK了 這個(gè)系統(tǒng)架構(gòu)圖版權(quán)屬于sunjun041640。 RabbitMQ Server: 也叫broker server,它不是運(yùn)送食物的卡車(chē),而是一種傳輸服務(wù)。原話(huà)是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是維護(hù)一條從Producer到Consumer的路線(xiàn),保證數(shù)據(jù)能夠按照指定的方式進(jìn)行傳輸。但是這個(gè)保證也不是100%的保證,但是對(duì)于普通的應(yīng)用來(lái)說(shuō)這已經(jīng)足夠了。當(dāng)然對(duì)于商業(yè)系統(tǒng)來(lái)說(shuō),可以再做一層數(shù)據(jù)一致性的guard,就可以徹底保證系統(tǒng)的一致性了。 Client A & B: 也叫Producer,數(shù)據(jù)的發(fā)送方。createmessages and publish (send) them to a broker server (RabbitMQ).一個(gè)Message有兩個(gè)部分:payload(有效載荷)和label(標(biāo)簽)。payload顧名思義就是傳輸?shù)臄?shù)據(jù)。label是exchange的名字或者說(shuō)是一個(gè)tag,它描述了payload,而且RabbitMQ也是通過(guò)這個(gè)label來(lái)決定把這個(gè)Message發(fā)給哪個(gè)Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個(gè)label的規(guī)則。 Client 1,2,3:也叫Consumer,數(shù)據(jù)的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個(gè)有名字的郵箱。當(dāng)有Message到達(dá)某個(gè)郵箱后,RabbitMQ把它發(fā)送給它的某個(gè)訂閱者即Consumer。當(dāng)然可能會(huì)把同一個(gè)Message發(fā)送給很多的Consumer。在這個(gè)Message中,只有payload,label已經(jīng)被刪掉了。對(duì)于Consumer來(lái)說(shuō),它是不知道誰(shuí)發(fā)送的這個(gè)信息的。就是協(xié)議本身不支持。但是當(dāng)然了如果Producer發(fā)送的payload包含了Producer的信息就另當(dāng)別論了。 對(duì)于一個(gè)數(shù)據(jù)從Producer到Consumer的正確傳遞,還有三個(gè)概念需要明確:exchanges, queues and bindings。 Exchanges are where producers publish their messages. Queuesare where the messages end up and are received by consumers Bindings are how the messages get routed from the exchange to particular queues. 還有幾個(gè)概念是上述圖中沒(méi)有標(biāo)明的,那就是Connection(連接),Channel(通道,頻道)。 Connection: 就是一個(gè)TCP的連接。Producer和Consumer都是通過(guò)TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個(gè)TCP連接。 Channels: 虛擬連接。它建立在上述的TCP連接中。數(shù)據(jù)流動(dòng)都是在Channel中進(jìn)行的。也就是說(shuō),一般情況是程序起始建立TCP連接,第二步就是建立這個(gè)Channel。 那么,為什么使用Channel,而不是直接使用TCP連接? 對(duì)于OS來(lái)說(shuō),建立和關(guān)閉TCP連接是有代價(jià)的,頻繁的建立關(guān)閉TCP連接對(duì)于系統(tǒng)的性能有很大的影響,而且TCP的連接數(shù)也有限制,這也限制了系統(tǒng)處理高并發(fā)的能力。但是,在TCP連接中建立Channel是沒(méi)有上述代價(jià)的。對(duì)于Producer或者Consumer來(lái)說(shuō),可以并發(fā)的使用多個(gè)Channel進(jìn)行Publish或者Receive。有實(shí)驗(yàn)表明,1s的數(shù)據(jù)可以Publish10K的數(shù)據(jù)包。當(dāng)然對(duì)于不同的硬件環(huán)境,不同的數(shù)據(jù)包大小這個(gè)數(shù)據(jù)肯定不一樣,但是我只想說(shuō)明,對(duì)于普通的Consumer或者Producer來(lái)說(shuō),這已經(jīng)足夠了。如果不夠用,你考慮的應(yīng)該是如何細(xì)化split你的設(shè)計(jì)。 對(duì)于API的使用可以參考官方的java文檔,(里面的思想無(wú)論是java、cpp還是python都相似) http://m.oschina.net/blog/186202 從AMQP協(xié)議可以看出,MessageQueue、Exchange和Binding構(gòu)成了AMQP協(xié)議的核心,下面我們就圍繞這三個(gè)主要組件 從應(yīng)用使用的角度全面的介紹如何利用Rabbit MQ構(gòu)建消息隊(duì)列以及使用過(guò)程中的注意事項(xiàng)。
在Rabbit MQ中,無(wú)論是生產(chǎn)者發(fā)送消息還是消費(fèi)者接受消息,都首先需要聲明一個(gè)MessageQueue。這就存在一個(gè)問(wèn)題,是生產(chǎn)者聲明還是消費(fèi)者聲明呢?要解決這個(gè)問(wèn)題,首先需要明確: a)消費(fèi)者是無(wú)法訂閱或者獲取不存在的MessageQueue中信息。 b)消息被Exchange接受以后,如果沒(méi)有匹配的Queue,則會(huì)被丟棄。 在明白了上述兩點(diǎn)以后,就容易理解如果是消費(fèi)者去聲明Queue,就有可能會(huì)出現(xiàn)在聲明Queue之前,生產(chǎn)者已發(fā)送的消息被丟棄的隱患。如果應(yīng)用能夠通過(guò)消息重發(fā)的機(jī)制允許消息丟失,則使用此方案沒(méi)有任何問(wèn)題。但是如果不能接受該方案,這就需要無(wú)論是生產(chǎn)者還是消費(fèi)者,在發(fā)送或者接受消息前,都需要去嘗試建立消息隊(duì)列。這里有一點(diǎn)需要明確,如果客戶(hù)端嘗試建立一個(gè)已經(jīng)存在的消息隊(duì)列,Rabbit MQ不會(huì)做任何事情,并返回客戶(hù)端建立成功的。 如果一個(gè)消費(fèi)者在一個(gè)信道中正在監(jiān)聽(tīng)某一個(gè)隊(duì)列的消息,Rabbit MQ是不允許該消費(fèi)者在同一個(gè)channel去聲明其他隊(duì)列的。Rabbit MQ中,可以通過(guò)queue.declare命令聲明一個(gè)隊(duì)列,可以設(shè)置該隊(duì)列以下屬性: a) Exclusive:排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次聲明它的連接可見(jiàn),并在連接斷開(kāi)時(shí)自動(dòng)刪除。這里需要注意三點(diǎn):其一,排他隊(duì)列是基于連接可見(jiàn)的,同一連接的不同信道是可以同時(shí)訪問(wèn)同一個(gè)連接創(chuàng)建的排他隊(duì)列的。其二,“首次”,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的,這個(gè)與普通隊(duì)列不同。其三,即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶(hù)端退出,該排他隊(duì)列都會(huì)被自動(dòng)刪除的。這種隊(duì)列適用于只限于一個(gè)客戶(hù)端發(fā)送讀取消息的應(yīng)用場(chǎng)景。 b) Auto-delete:自動(dòng)刪除,如果該隊(duì)列沒(méi)有任何訂閱的消費(fèi)者的話(huà),該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列。 c) Durable:持久化,這個(gè)會(huì)在后面作為專(zhuān)門(mén)一個(gè)章節(jié)討論。 d) 其他選項(xiàng),例如如果用戶(hù)僅僅想查詢(xún)某一個(gè)隊(duì)列是否已存在,如果不存在,不想建立該隊(duì)列,仍然可以調(diào)用queue.declare,只不過(guò)需要將參數(shù)passive設(shè)為true,傳給queue.declare,如果該隊(duì)列已存在,則會(huì)返回true;如果不存在,則會(huì)返回Error,但是不會(huì)創(chuàng)建新的隊(duì)列。
在AMQP模型中,Exchange是接受生產(chǎn)者消息并將消息路由到消息隊(duì)列的關(guān)鍵組件。ExchangeType和Binding決定了消息的路由規(guī)則。所以生產(chǎn)者想要發(fā)送消息,首先必須要聲明一個(gè)Exchange和該Exchange對(duì)應(yīng)的Binding。可以通過(guò) ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,聲明一個(gè)Exchange需要三個(gè)參數(shù):ExchangeName,ExchangeType和Durable。ExchangeName是該Exchange的名字,該屬性在創(chuàng)建Binding和生產(chǎn)者通過(guò)publish推送消息時(shí)需要指定。ExchangeType,指Exchange的類(lèi)型,在RabbitMQ中,有三種類(lèi)型的Exchange:direct ,fanout和topic,不同的Exchange會(huì)表現(xiàn)出不同路由行為。Durable是該Exchange的持久化屬性,這個(gè)會(huì)在消息持久化章節(jié)討論。聲明一個(gè)Binding需要提供一個(gè)QueueName,ExchangeName和BindingKey。下面我們就分析一下不同的ExchangeType表現(xiàn)出的不同路由規(guī)則。 生產(chǎn)者在發(fā)送消息時(shí),都需要指定一個(gè)RoutingKey和Exchange,Exchange在接到該RoutingKey以后,會(huì)判斷該ExchangeType: a) 如果是Direct類(lèi)型,則會(huì)將消息中的RoutingKey與該Exchange關(guān)聯(lián)的所有Binding中的BindingKey進(jìn)行比較,如果相等,則發(fā)送到該Binding對(duì)應(yīng)的Queue中。
b) 如果是 Fanout 類(lèi)型,則會(huì)將消息發(fā)送給所有與該 Exchange 定義過(guò) Binding 的所有 Queues 中去,其實(shí)是一種廣播行為。
c)如果是Topic類(lèi)型,則會(huì)按照正則表達(dá)式,對(duì)RoutingKey與BindingKey進(jìn)行匹配,如果匹配成功,則發(fā)送到對(duì)應(yīng)的Queue中。
在RabbitMQ中消費(fèi)者有2種方式獲取隊(duì)列中的消息: a) 一種是通過(guò)basic.consume命令,訂閱某一個(gè)隊(duì)列中的消息,channel會(huì)自動(dòng)在處理完上一條消息之后,接收下一條消息。(同一個(gè)channel消息處理是串行的)。除非關(guān)閉channel或者取消訂閱,否則客戶(hù)端將會(huì)一直接收隊(duì)列的消息。 b) 另外一種方式是通過(guò)basic.get命令主動(dòng)獲取隊(duì)列中的消息,但是絕對(duì)不可以通過(guò)循環(huán)調(diào)用basic.get來(lái)代替basic.consume,這是因?yàn)閎asic.get RabbitMQ在實(shí)際執(zhí)行的時(shí)候,是首先consume某一個(gè)隊(duì)列,然后檢索第一條消息,然后再取消訂閱。如果是高吞吐率的消費(fèi)者,最好還是建議使用basic.consume。 如果有多個(gè)消費(fèi)者同時(shí)訂閱同一個(gè)隊(duì)列的話(huà),RabbitMQ是采用循環(huán)的方式分發(fā)消息的,每一條消息只能被一個(gè)訂閱者接收。例如,有隊(duì)列Queue,其中ClientA和ClientB都Consume了該隊(duì)列,MessageA到達(dá)隊(duì)列后,被分派到ClientA,ClientA回復(fù)服務(wù)器收到響應(yīng),服務(wù)器刪除MessageA;再有一條消息MessageB抵達(dá)隊(duì)列,服務(wù)器根據(jù)“循環(huán)推送”原則,將消息會(huì)發(fā)給ClientB,然后收到ClientB的確認(rèn)后,刪除MessageB;等到再下一條消息時(shí),服務(wù)器會(huì)再將消息發(fā)送給ClientA。 這里我們可以看出,消費(fèi)者再接到消息以后,都需要給服務(wù)器發(fā)送一條確認(rèn)命令,這個(gè)即可以在handleDelivery里顯示的調(diào)用basic.ack實(shí)現(xiàn),也可以在Consume某個(gè)隊(duì)列的時(shí)候,設(shè)置autoACK屬性為true實(shí)現(xiàn)。這個(gè)ACK僅僅是通知服務(wù)器可以安全的刪除該消息,而不是通知生產(chǎn)者,與RPC不同。 如果消費(fèi)者在接到消息以后還沒(méi)來(lái)得及返回ACK就斷開(kāi)了連接,消息服務(wù)器會(huì)重傳該消息給下一個(gè)訂閱者,如果沒(méi)有訂閱者就會(huì)存儲(chǔ)該消息。 既然RabbitMQ提供了ACK某一個(gè)消息的命令,當(dāng)然也提供了Reject某一個(gè)消息的命令。當(dāng)客戶(hù)端發(fā)生錯(cuò)誤,調(diào)用basic.reject命令拒絕某一個(gè)消息時(shí),可以設(shè)置一個(gè)requeue的屬性,如果為true,則消息服務(wù)器會(huì)重傳該消息給下一個(gè)訂閱者;如果為false,則會(huì)直接刪除該消息。當(dāng)然,也可以通過(guò)ack,讓消息服務(wù)器直接刪除該消息并且不會(huì)重傳。
Rabbit MQ默認(rèn)是不持久隊(duì)列、Exchange、Binding以及隊(duì)列中的消息的,這意味著一旦消息服務(wù)器重啟,所有已聲明的隊(duì)列,Exchange,Binding以及隊(duì)列中的消息都會(huì)丟失。通過(guò)設(shè)置Exchange和MessageQueue的durable屬性為true,可以使得隊(duì)列和Exchange持久化,但是這還不能使得隊(duì)列中的消息持久化,這需要生產(chǎn)者在發(fā)送消息的時(shí)候,將delivery mode設(shè)置為2,只有這3個(gè)全部設(shè)置完成后,才能保證服務(wù)器重啟不會(huì)對(duì)現(xiàn)有的隊(duì)列造成影響。這里需要注意的是,只有durable為true的Exchange和durable為ture的Queues才能綁定,否則在綁定時(shí),RabbitMQ都會(huì)拋錯(cuò)的。持久化會(huì)對(duì)RabbitMQ的性能造成比較大的影響,可能會(huì)下降10倍不止。
對(duì)事務(wù)的支持是AMQP協(xié)議的一個(gè)重要特性。假設(shè)當(dāng)生產(chǎn)者將一個(gè)持久化消息發(fā)送給服務(wù)器時(shí),因?yàn)閏onsume命令本身沒(méi)有任何Response返回,所以即使服務(wù)器崩潰,沒(méi)有持久化該消息,生產(chǎn)者也無(wú)法獲知該消息已經(jīng)丟失。如果此時(shí)使用事務(wù),即通過(guò)txSelect()開(kāi)啟一個(gè)事務(wù),然后發(fā)送消息給服務(wù)器,然后通過(guò)txCommit()提交該事務(wù),即可以保證,如果txCommit()提交了,則該消息一定會(huì)持久化,如果txCommit()還未提交即服務(wù)器崩潰,則該消息不會(huì)服務(wù)器就收。當(dāng)然Rabbit MQ也提供了txRollback()命令用于回滾某一個(gè)事務(wù)。
使用事務(wù)固然可以保證只有提交的事務(wù),才會(huì)被服務(wù)器執(zhí)行。但是這樣同時(shí)也將客戶(hù)端與消息服務(wù)器同步起來(lái),這背離了消息隊(duì)列解耦的本質(zhì)。Rabbit MQ提供了一個(gè)更加輕量級(jí)的機(jī)制來(lái)保證生產(chǎn)者可以感知服務(wù)器消息是否已被路由到正確的隊(duì)列中——Confirm。如果設(shè)置channel為confirm狀態(tài),則通過(guò)該channel發(fā)送的消息都會(huì)被分配一個(gè)唯一的ID,然后一旦該消息被正確的路由到匹配的隊(duì)列中后,服務(wù)器會(huì)返回給生產(chǎn)者一個(gè)Confirm,該Confirm包含該消息的ID,這樣生產(chǎn)者就會(huì)知道該消息已被正確分發(fā)。對(duì)于持久化消息,只有該消息被持久化后,才會(huì)返回Confirm。Confirm機(jī)制的最大優(yōu)點(diǎn)在于異步,生產(chǎn)者在發(fā)送消息以后,即可繼續(xù)執(zhí)行其他任務(wù)。而服務(wù)器返回Confirm后,會(huì)觸發(fā)生產(chǎn)者的回調(diào)函數(shù),生產(chǎn)者在回調(diào)函數(shù)中處理Confirm信息。如果消息服務(wù)器發(fā)生異常,導(dǎo)致該消息丟失,會(huì)返回給生產(chǎn)者一個(gè)nack,表示消息已經(jīng)丟失,這樣生產(chǎn)者就可以通過(guò)重發(fā)消息,保證消息不丟失。Confirm機(jī)制在性能上要比事務(wù)優(yōu)越很多。但是Confirm機(jī)制,無(wú)法進(jìn)行回滾,就是一旦服務(wù)器崩潰,生產(chǎn)者無(wú)法得到Confirm信息,生產(chǎn)者其實(shí)本身也不知道該消息吃否已經(jīng)被持久化,只有繼續(xù)重發(fā)來(lái)保證消息不丟失,但是如果原先已經(jīng)持久化的消息,并不會(huì)被回滾,這樣隊(duì)列中就會(huì)存在兩條相同的消息,系統(tǒng)需要支持去重。
Broker:簡(jiǎn)單來(lái)說(shuō)就是消息隊(duì)列服務(wù)器實(shí)體。 消息隊(duì)列的使用過(guò)程大概如下: (1)客戶(hù)端連接到消息隊(duì)列服務(wù)器,打開(kāi)一個(gè)channel。 1. 安裝erlang: 下載所需的源碼:wget http:///download/otp_src_R13B04.tar.gz wget http:///download/otp_src_R13B04.tar.gz 然后./configure && make && make install 注:在configure之后發(fā)現(xiàn)有以下提示信息: (1) odbc : ODBC library – link check failed 需要yum -y install unixODBC unixODBC-devel (2)“ wxWidgets not found, wx will NOT be usable”及“fop is missing.”這兩個(gè)可以忽略。 安裝完畢,在命令行鍵入erl,將會(huì)出現(xiàn)如下命令行: Erlang R13B04 (erts-5.7.5) [source] [smp:2:2] [rq:2][async-threads:0] [hipe] [kernel-poll:false] (abort with ^G) 2.安裝RabbitMQ RabbitMQ的安裝有很多版本,我們使用Generic Unix版本。 在http://www./install-generic-unix.html中下載http://www./releases/rabbitmq-server/v3.4.3/rabbitmq-server-generic-unix-3.4.3.tar.gz并解壓到Linux中的任意目錄(/mysoftware/rabbitmq_server-3.4.3/),解壓后即可使用。 3.RabbitMQ開(kāi)啟與關(guān)閉 進(jìn)入rabbitmq安裝目錄下的sbin目錄,分別執(zhí)行以下命令:
開(kāi)啟RabbitMQ服務(wù):. ./rabbitmq-server -detached (-detached為可選參數(shù),表示后臺(tái)開(kāi)啟) 開(kāi)啟RabbitMQ管理工具,通過(guò)瀏覽器訪問(wèn)http://localhost:15672使用:. ./rabbitmq-plugins enable rabbitmq_management (3.3.1版本以后默認(rèn)不允許用guest遠(yuǎn)程管理??梢允謩?dòng)創(chuàng)建管理員賬號(hào)來(lái)遠(yuǎn)程管理,具體參見(jiàn)4.4) 關(guān)閉RabbitMQ服務(wù): ./rabbitmqctl stop 注意:如果是登錄遠(yuǎn)程管理界面(比如物理機(jī)訪問(wèn)虛擬機(jī)的管理界面時(shí)),除了關(guān)閉防火墻開(kāi)通相應(yīng)允許的端口以外,也不能用guest/guest訪問(wèn),而應(yīng)該新創(chuàng)建用戶(hù)并賦予管理員權(quán)限才可訪問(wèn) 增加用戶(hù):rabbitmqctl add_user ddd ddd 設(shè)置管理員權(quán)限: rabbitmqctl set_user_tags diego administrator 這樣才可以訪問(wèn)web頁(yè)面提供的管理頁(yè)面 ./rabbitmqctl set_permissions -p / diego '.*' '.*' '.*' 給用戶(hù)diego加上有訪問(wèn)虛擬空間/的權(quán)限 4.設(shè)置RabbitMQ管理權(quán)限 Rabbitmq服務(wù)器的主要通過(guò)rabbitmqctl和rabbimq-plugins兩個(gè)工具來(lái)管理,以下是一些常用功能。 1). 服務(wù)器啟動(dòng)與關(guān)閉 啟動(dòng): rabbitmq-server –detached 關(guān)閉:rabbitmqctl stop 若單機(jī)有多個(gè)實(shí)例,則在rabbitmqctlh后加–n 指定名稱(chēng) 2). 插件管理 開(kāi)啟某個(gè)插件:rabbitmq-pluginsenable xxx 關(guān)閉某個(gè)插件:rabbitmq-pluginsdisablexxx 注意:重啟服務(wù)器后生效。 3).virtual_host管理 新建virtual_host: rabbitmqctladd_vhost xxx 撤銷(xiāo)virtual_host:rabbitmqctl delete_vhost xxx 4). 用戶(hù)管理 新建用戶(hù):rabbitmqctl add_user xxx pwd 刪除用戶(hù): rabbitmqctl delete_user xxx 改密碼: rabbimqctl change_password {username} {newpassword} 設(shè)置用戶(hù)角色:rabbitmqctl set_user_tags {username} {tag ...} Tag可以為 administrator,monitoring, management 5). 權(quán)限管理 權(quán)限設(shè)置:set_permissions [-pvhostpath] {user} {conf} {write} {read} Vhostpath Vhost路徑 user 用戶(hù)名 Conf 一個(gè)正則表達(dá)式match哪些配置資源能夠被該用戶(hù)訪問(wèn)。 Write 一個(gè)正則表達(dá)式match哪些配置資源能夠被該用戶(hù)讀。 Read 一個(gè)正則表達(dá)式match哪些配置資源能夠被該用戶(hù)訪問(wèn)。 6). 獲取服務(wù)器狀態(tài)信息 服務(wù)器狀態(tài):rabbitmqctl status 隊(duì)列信息:rabbitmqctl list_queues[-p vhostpath] [queueinfoitem ...] Queueinfoitem可以為:name,durable,auto_delete,arguments,messages_ready, messages_unacknowledged,messages,consumers,memory Exchange信息:rabbitmqctllist_exchanges[-p vhostpath] [exchangeinfoitem ...] Exchangeinfoitem有:name,type,durable,auto_delete,internal,arguments. Binding信息:rabbitmqctllist_bindings[-p vhostpath] [bindinginfoitem ...] Bindinginfoitem有:source_name,source_kind,destination_name,destination_kind,routing_key,arguments Connection信息:rabbitmqctllist_connections [connectioninfoitem ...] Connectioninfoitem有:recv_oct,recv_cnt,send_oct,send_cnt,send_pend等。 Channel信息:rabbitmqctl list_channels[channelinfoitem ...] Channelinfoitem有consumer_count,messages_unacknowledged,messages_uncommitted,acks_uncommitted,messages_unconfirmed,prefetch_count,client_flow_blocked rabbitmq 實(shí)現(xiàn)原理AMQP(高級(jí)消息隊(duì)列協(xié)議 Advanced Message Queue Protocol) AMQP當(dāng)中有四個(gè)概念非常重要: 虛擬主機(jī)(virtual host),交換機(jī)(exchange),隊(duì)列(queue)和綁定(binding)。一個(gè)虛擬主機(jī)持有一組交換機(jī)、隊(duì)列和綁定。為什么需要多個(gè)虛擬主機(jī)呢?很簡(jiǎn)單,RabbitMQ當(dāng)中,用戶(hù)只能在虛擬主機(jī)的粒度進(jìn)行權(quán)限控制。因此,如果需要禁止A組訪問(wèn)B組的交換機(jī)/隊(duì)列/綁定,必須為A和B分別創(chuàng) 建一個(gè)虛擬主機(jī)。每一個(gè)RabbitMQ服務(wù)器都有一個(gè)默認(rèn)的虛擬主機(jī)“/”。 Producer 要產(chǎn)生消息必須要?jiǎng)?chuàng)建一個(gè) Exchange ,Exchange 用于轉(zhuǎn)發(fā)消息,但是它不會(huì)做存儲(chǔ),如果沒(méi)有 Queue bind 到 Exchange 的話(huà),它會(huì)直接丟棄掉 Producer 發(fā)送過(guò)來(lái)的消息,當(dāng)然如果消息總是發(fā)送過(guò)去就被直接丟棄那就沒(méi)有什么意思了,一個(gè) Consumer 想要接受消息的話(huà),就要?jiǎng)?chuàng)建一個(gè) Queue ,并把這個(gè) Queue bind 到指定的 Exchange 上,然后 Exchange 會(huì)把消息轉(zhuǎn)發(fā)到 Queue 那里,Queue 會(huì)負(fù)責(zé)存儲(chǔ)消息,Consumer 可以通過(guò)主動(dòng) Pop 或者是 Subscribe 之后被動(dòng)回調(diào)的方式來(lái)從 Queue 中取得消息。 Exchange,Queue,RoutingKey 藍(lán)色-- Client(相對(duì)于Rabbitmq Server來(lái)說(shuō)) 綠色--Exchange 紅色—Queue - 交換器(Exchange),它是發(fā)送消息的實(shí)體。 - 隊(duì)列(Queue),這是接收消息的實(shí)體。 - 綁定器(Bind),將交換器和隊(duì)列連接起來(lái),并且封裝消息的路由信息。 Exchange指向Queue的黑色線(xiàn)—RoutingKey,可以將它簡(jiǎn)單的理解為一條連接Exchange和Queue的路線(xiàn) Exchange和Queue都需要通過(guò)channel來(lái)進(jìn)行定義,而RoutingKey則只需要在binding時(shí)取個(gè)名字就行了。 這一塊的理解是不正確的, Exchange Queue RoutingKey關(guān)系說(shuō)明:
由結(jié)果可以看出,由Exchange,Queue,RoutingKey三個(gè)才能決定一個(gè)從Exchange到Queue的唯一的線(xiàn)路。 左邊的Client向右邊的Client發(fā)送消息,流程: 1, 獲取Conection 2, 獲取Channel 3, 定義Exchange,Queue 4, 使用一個(gè)RoutingKey將Queue Binding到一個(gè)Exchange上 5, 通過(guò)指定一個(gè)Exchange和一個(gè)RoutingKey來(lái)將消息發(fā)送到對(duì)應(yīng)的Queue上, 6, 接收方在接收時(shí)也是獲取connection,接著獲取channel,然后指定一個(gè)Queue直接到它關(guān)心的Queue上取消息,它對(duì)Exchange,RoutingKey及如何binding都不關(guān)心,到對(duì)應(yīng)的Queue上去取消息就OK了(這個(gè)是重點(diǎn),接收方,只關(guān)心queue的名字即可,其他的都不關(guān)心?。。。?!) 換句話(huà)說(shuō): 發(fā)送放關(guān)心:ExchangName, QueueName,BindingKey 接收方關(guān)心:QuueuName 一個(gè)Client發(fā)送消息,哪些Client可以收到消息,其核心就在于Exchange,RoutingKey,Queue的關(guān)系上。
我們可以這樣理解,RoutingKey就像是個(gè)中間表,將兩個(gè)表的數(shù)據(jù)進(jìn)行多對(duì)多關(guān)聯(lián),只不過(guò)對(duì)于相同的Exchange和Queue,可以使用不同的RoutingKey重復(fù)關(guān)聯(lián)多次。 |
|
來(lái)自: 后知后覺(jué)1003 > 《technology》