NSQ簡介
NSQ 是實時的分布式消息處理平臺,其設計的目的是用來大規(guī)模地處理每天數(shù)以十億計級別的消息。NSQ 具有分布式和去中心化拓撲結(jié)構(gòu),該結(jié)構(gòu)具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規(guī)模生成環(huán)境下應用的產(chǎn)品。
NSQ 由 3 個守護進程組成:
nsqd 是接收、保存和傳送消息到客戶端的守護進程。
nsqlookupd 是管理的拓撲信息,維護著所有nsqd的狀態(tài),并提供了最終一致發(fā)現(xiàn)服務的守護進程。
nsqadmin 是一個 Web UI 來實時監(jiān)控集群和執(zhí)行各種管理任務。
這篇文章介紹主要介紹nsqd的實現(xiàn)。
Topic與Channel
Topic與Channel是NSQ中重要的兩個概念。
生產(chǎn)者將消息寫到Topic中,一個Topic下可以有多個Channel,每個Channel都是Topic的完整副本。
消費者從Channel處訂閱消息,如果有多個消費者訂閱同一個Channel,Channel中的消息將被傳遞到一個隨機的消費者。
type NSQD struct { //一個nsqd實例可以有多個Topic topicMap map[string]*Topic }
type Topic struct { name string //一個Topic實例下有多個Channel channelMap map[string]*Channel memoryMsgChan chan *Message }
//golang中g(shù)oroutine之間的是通過chan來通信的,如果想要往該topic發(fā)布消息,只需要將消息寫到Topic.memoryMsgChan中 //Topic創(chuàng)建成功后會開啟一個新的goroutine(messagePump)負責監(jiān)聽Topic.memoryMsgChan,當有新消息時會將將消息復制N份發(fā)送到該Topic下的所有Channel中 func NewTopic(topicName string) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel),//該Topic下的所有Channel memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize), exitChan: make(chan int), } //開啟一個goroutine負責監(jiān)聽寫到該Topic的消息 t.waitGroup.Wrap(func() { t.messagePump() }) return t }
func (t *Topic) messagePump() { var msg *Message var chans []*Channel var memoryMsgChan chan *Message //取出該Topic下所有的Channel for _, c := range t.channelMap { chans = append(chans, c) } for { //從memoryMsgChan中取出一個消息,并將消息復制N份,發(fā)送到N個Channel中 select { case msg = <-memoryMsgChan: case <-t.exitChan: return }
for i, channel := range chans { chanMsg := NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp err := channel.PutMessage(chanMsg)//消息寫入到channel的Channel.memoryMsgChan中 } }
}
//Channel.memoryMsgChan負責接收寫到該Channel的所有消息 //創(chuàng)建創(chuàng)建Channel時會開啟一個新的goroutine(messagePump)負責監(jiān)聽Channel.memoryMsgChan,當有消息時會將該消息寫到Channel.clientMsgChan中,訂閱該channel的consumer都會試圖從clientMsgChan中取消息,一條消息只能被一個consumer搶到 //Channel還負責消息的可靠傳遞,當消息發(fā)往consumer時,Channel會記錄下該消息的發(fā)送時間,如果在一定時間內(nèi)(msg-timeout參數(shù))沒有接受到consumer對該消息的確認,Channel會將該消息重新寫到Channel.memoryMsgChan中,再次發(fā)送給客戶端。
type Channel struct { name string //channel的名稱 memoryMsgChan chan *Message clientMsgChan chan *Message clients map[int64]Consumer }
func NewChannel(topicName string, channelName string) c := &Channel{ topicName: topicName, name: channelName, memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize), clientMsgChan: make(chan *Message), exitChan: make(chan int), } go c.messagePump() return c } //往channel中寫入消息。 func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m: } return nil }
func (c *Channel) messagePump() { var msg *Message for { select { case msg = <-c.memoryMsgChan: case <-c.exitChan: goto exit } c.clientMsgChan <- msg //多個消費者會同時爭搶clientMsgChan中得消息,但只有一個消費者爭搶到 } exit: close(c.clientMsgChan) }
要理解Topic Channel中各種chan的作用,關(guān)鍵是要理解golang中如何在并發(fā)環(huán)境下如何操作一個結(jié)構(gòu)體(多個goroutine同時操作topic),與C/C++多線程操作同一個結(jié)構(gòu)體時加鎖(mutex,rwmutex)不同,go語言中一般是為這個結(jié)構(gòu)體(topic,channel)開啟一個主goroutine(messagePump函數(shù)),所有對該結(jié)構(gòu)體的改變的操作都應是該主goroutine完成的,也就不存在并發(fā)的問題了,其它goroutine如果想要改變這個結(jié)構(gòu)體則應該向結(jié)構(gòu)體提供的chan中發(fā)送消息(msgchan)或者通知(exitchan,updatechan),主goroutine會一直監(jiān)聽所有的chan,當有消息或者通知到來時做相應的處理。
數(shù)據(jù)的持久化
了解數(shù)據(jù)的持久化之前,我們先來看兩個問題?
1. 往Topic中寫入消息就是將消息發(fā)送到Topic.memoryMsgChan中,但是memoryMsgChan是一個固定內(nèi)存大小的內(nèi)存隊列,如果隊列滿了怎么辦呢?會阻塞嗎?
2. 如果消息都存放在memoryMsgChan這個內(nèi)存隊列中,程序退出了消息就全部丟失了嗎?
NSQ是如何解決的呢,nsq在創(chuàng)建Topic、Channel的時候都會創(chuàng)建一個DiskQueue,DiskQueue負責向磁盤文件中寫入消息、從磁盤文件中讀取消息,是NSQ實現(xiàn)數(shù)據(jù)持久化的最重要結(jié)構(gòu)。
以Topic為例,如果向Topic.memoryMsgChan寫入消息但是memoryMsgChan已滿時,nsq會將消息寫到topic.DiskQueue中,DiskQueue會負責將消息內(nèi)存同步到磁盤上。
如果從Topic.memoryMsgChan中讀取消息時,但是memoryMsgChan并沒有消息時,就從topic.DiskQueue中取出同步到磁盤文件中的消息。
func NewTopic(topicName string,ctx *context) *Topic { ... //其它初始化代碼 // ctx.nsqd.opts都是一些程序啟動時的命令行參數(shù) t.backend = newDiskQueue(topicName, ctx.nsqd.opts.DataPath, ctx.nsqd.opts.MaxBytesPerFile, ctx.nsqd.opts.SyncEvery, ctx.nsqd.opts.SyncTimeout, ctx.nsqd.opts.Logger) return t } //將消息寫到topic的channel中,如果topic的memoryMsgChan已滿則將topic寫到磁盤文件中 func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: //從buffer池中取出一個buffer接口,將消息寫到buffer中,再將buffer寫到topic.backend的wirteChan中 //buffer池是為了避免重復的創(chuàng)建銷毀buffer對象 b := bufferPoolGet() t.backend.WriteChan <- b bufferPoolPut(b) } return nil }
func (t *Topic) messagePump() { ...//參見上文代碼 for { //從memoryMsgChan及DiskQueue.ReadChan中取消息 select { case msg = <-memoryMsgChan: case buf = <- t.backend.ReadChan(): msg, _ = decodeMessage(buf) case <-t.exitChan: return } ... //將msg復制N份,發(fā)送到topic下的N個Channel中 } }
我們看到topic.backend(diskQueue)負責將消息寫到磁盤并從磁盤中讀取消息,diskQueue提供了兩個chan供外部使用:readChan與writeChan。
我們來看下diskQueue實現(xiàn)中的幾個要點。
- diskQueue在創(chuàng)建時會開啟一個goroutine,從磁盤文件中讀取消息寫到readChan中,外部goroutine可以從readChan中獲取消息;隨時監(jiān)聽writeChan,當有消息時從wirtechan中取出消息,寫到本地磁盤文件。
- diskQueue既要提供文件的讀服務又要提供文件的寫服務,所以要記錄下文件的讀位置(readIndex),寫位置(writeIndex)。每次從文件中讀取消息時使用file.Seek(readindex)定位到文件讀位置然后讀取消息信息,每次往文件中寫入消息時都要file.Seek(writeIndex)定位到寫位置再將消息寫入。
- readIndex,writeIndex很重要,程序退出時要將這些信息(meta data)寫到另外的磁盤文件(元信息文件)中,程序啟動時首先讀取元信息文件,在根據(jù)元信息文件中的readIndex writeIndex操作存儲信息的文件。
- 由于操作系統(tǒng)層也有緩存,調(diào)用file.Write()寫入的信息,也可能只是存在緩存中并沒有同步到磁盤,需要顯示調(diào)用file.sync()才可以強制要求操作系統(tǒng)把緩存同步到磁盤。可以通過指定創(chuàng)建diskQueue時傳入的syncEvery,syncTimeout來控制調(diào)用file.sync()的頻率。syncTimeout是指每隔syncTimeout秒調(diào)用一次file.sync(),syncEvery是指每當寫入syncEvery個消息后調(diào)用一次file.sync()。這兩個參數(shù)都可以在啟動nsqd程序時通過命令行指定。
網(wǎng)絡架構(gòu)
nsq是一個可靠的、高性能的服務端網(wǎng)絡程序,通過閱讀nsqd的源碼來學習如何搭建一個可靠的網(wǎng)絡服務端程序。
//首先是監(jiān)聽端口,當有請求到來時開啟一個goroutine去處理該鏈接請求 func TCPServer(listener net.Listener) { for { clientConn, err := listener.Accept() go Handle(clientConn) } }
func Handle(clientConn net.Conn) { //客戶端首先需要發(fā)送一個四字節(jié)的協(xié)議編號,表示客戶端當前所使用的協(xié)議 //這樣便于以后平滑的協(xié)議升級,服務端可以根據(jù)客戶端的協(xié)議編號做不同的處理 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) protocolMagic := string(buf)
var prot util.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: return } //成功建立連接,按照相應的協(xié)議編號去處理該鏈接 err = prot.IOLoop(clientConn) return } }
客戶端已成功的與服務器建立鏈接了,每一個客戶端建立連接后,nsqd都會創(chuàng)建一個Client接口體,該結(jié)構(gòu)體內(nèi)保存一些client的狀態(tài)信息。
每一個Client都會有兩個goroutine,一個goroutine負責讀取客戶端主動發(fā)送的各種命令,解析命令,處理命令并將處理結(jié)果回復給客戶端。
另一個goutine負責定時發(fā)送心跳信息給客戶端,如果客戶端訂閱某個channel的話則將channel中的將消息通過網(wǎng)絡發(fā)送給客戶端。
如果服務端不需要主動推送大量消息給客戶端,一個連接只需要開一個goroutine處理請求并發(fā)送回復就可以了,這是最簡單的方式。開啟兩個goroutine操作同一個conn的話就需要注意加鎖了。
func (p *protocolV2) IOLoop(conn net.Conn) error { //創(chuàng)建一個新的Client對象 clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx)
//開啟另一個goroutine,定時發(fā)送心跳信息,客戶端收到心跳信息后要回復。 //如果nsqd長時間未收到該連接的心跳回復說明連接已出問題,會斷開連接,這就是nsq的心跳實現(xiàn)機制 go p.messagePump(client)
for {
//如果超過client.HeartbeatInterval * 2時間間隔內(nèi)未收到客戶端發(fā)送的命令,說明連接處問題了,需要關(guān)閉此鏈接。 //正常情況下每隔HeartbeatInterval時間客戶端都會發(fā)送一個心跳回復。
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
//nsq規(guī)定所有的命令以 “\n”結(jié)尾,命令與參數(shù)之間以空格分隔 line, err = client.Reader.ReadSlice('\n')
//params[0]為命令的類型,params[1:]為命令參數(shù) params := bytes.Split(line, separatorBytes)
//處理客戶端發(fā)送過來的命令 response, err := p.Exec(client, params) if err != nil { sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if _, ok := err.(*util.FatalClientErr); ok { break } continue } //將命令的處理結(jié)果發(fā)送給客戶端 if response != nil { err = p.Send(client, frameTypeResponse, response) } }
//連接出問題了,需要關(guān)閉連接 conn.Close() close(client.ExitChan) //關(guān)閉client的ExitChan
//client.Channel記錄的是該客戶端訂閱的Channel,客戶端關(guān)閉的時候需要從Channel中移除這個訂閱者。 if client.Channel != nil { client.Channel.RemoveClient(client.ID) } return err }
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) } return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
我們來看下NSQ中幾個比較重要的命令:
- NOP 心跳回復,沒有實際意義
- PUB 發(fā)布一個消息到 話題(topic)
PUB <topic_name>\n [ 四字節(jié)消息的大小 ][ 消息的內(nèi)容 ]
- SUB 訂閱話題(topic) /通道(channel)
SUB <topic_name> <channel_name>\n
- RDY 更新 RDY 狀態(tài) (表示客戶端已經(jīng)準備好接收N 消息)
RDY <count>\n
- FIN 完成一個消息 (表示成功處理)
FIN <message_id>\n
生產(chǎn)者產(chǎn)生消息的過程比較簡單,就是一個PUB命令,先讀取四字節(jié)的消息大小,然后根據(jù)消息大小讀取消息內(nèi)容,然后將內(nèi)容寫到topic.MessageChan中。
我們重點來看下消費者是如何從nsq中讀取消息的。
1. 消費者首先需要發(fā)送SUB命令,告訴nsqd它想訂閱哪個Channel,然后nsqd將該Client與Channel建立對應關(guān)系。
2. 消費者發(fā)送RDY命令,告訴服務端它以準備好接受count個消息,服務端則向消費者發(fā)送count個消息,如果消費者想繼續(xù)接受消息就需要不斷發(fā)送RDY命令告訴服務端自己準備好接受消息(類似TCP協(xié)議中滑動窗口的概念,消費者并不是按照順序一個個的消費消息,NSQD最多可以同時count個消息給消費者,每推送給消費者一個消息count數(shù)目減一,當消費者處理完消息回復FIN指令時count+1)。
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { topicName := string(params[1]) channelName := string(params[2])
topic := p.ctx.nsqd.GetTopic(topicName) channel := topic.GetChannel(channelName) //將Client與Channel建立關(guān)聯(lián)關(guān)系 channel.AddClient(client.ID, client) client.Channel = channel
// update message pump client.SubEventChan <- channel return okBytes, nil }
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
subEventChan := client.SubEventChan heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
for { //IsReadyForMessages就是檢查Client的RDY命令所設置的ReadyCount,判斷是否可以繼續(xù)向Client發(fā)送消息 if subChannel == nil || !client.IsReadyForMessages() { //客戶端還未做好準備則將clientMsgChan設置為nil clientMsgChan = nil } else { //客戶端做好準備,則試圖從訂閱的Channel的clientMsgChan中讀取消息 clientMsgChan = subChannel.clientMsgChan }
select { //接收到客戶端發(fā)送的RDY命令后,則會向ReadyStateChan中寫入消息,下面的case條件則可滿足,重新進入for循環(huán) case <-client.ReadyStateChan: //接收到客戶端發(fā)送的SUB命令后,會向subEventChan中寫入消息,subEventChan則被置為nil,所以一個客戶端只能訂閱一次Channel case subChannel = <-subEventChan: // you can't SUB anymore subEventChan = nil //發(fā)送心跳消息 case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) //會有N個消費者共同監(jiān)聽channel.clientMsgChan,一條消息只能被一個消費者搶到 case msg, ok := <-clientMsgChan: if !ok { goto exit } //以消息的發(fā)送時間排序,將消息放在一個最小時間堆上,如果在規(guī)定時間內(nèi)收到對該消息的確認回復(FIN messageId)說明消息以被消費者成功處理,會將該消息從堆中刪除。 //如果超過一定時間沒有接受 FIN messageId,會從堆中取出該消息重新發(fā)送,所以nsq能確保一個消息至少被一個i消費處理。 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() //通過網(wǎng)絡發(fā)送給消費者 err = p.SendMessage(client, msg, &buf) case <-client.ExitChan: goto exit } }
exit: heartbeatTicker.Stop() }
參考文獻
NSQ 指南
使用消息隊列的 10 個理由
關(guān)于go同步和異步模式的疑惑
轉(zhuǎn)自:http://shanks./post/NSQ%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90%E4%B9%8BNSQD
|