一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

NSQ源碼剖析之nsqd

 vclyin 2018-08-22

NSQ簡介

NSQ 是實時的分布式消息處理平臺,其設計的目的是用來大規(guī)模地處理每天數(shù)以十億計級別的消息。NSQ 具有分布式和去中心化拓撲結(jié)構(gòu),該結(jié)構(gòu)具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征,是一個成熟的、已在大規(guī)模生成環(huán)境下應用的產(chǎn)品。

NSQ 由 3 個守護進程組成:
nsqd 是接收、保存和傳送消息到客戶端的守護進程。
nsqlookupd 是管理的拓撲信息,維護著所有nsqd的狀態(tài),并提供了最終一致發(fā)現(xiàn)服務的守護進程。
nsqadmin 是一個 Web UI 來實時監(jiān)控集群和執(zhí)行各種管理任務。
NSQ結(jié)構(gòu)圖

這篇文章介紹主要介紹nsqd的實現(xiàn)。

Topic與Channel

Topic與Channel是NSQ中重要的兩個概念。
生產(chǎn)者將消息寫到Topic中,一個Topic下可以有多個Channel,每個Channel都是Topic的完整副本。
消費者從Channel處訂閱消息,如果有多個消費者訂閱同一個Channel,Channel中的消息將被傳遞到一個隨機的消費者。

圖片標題

  1. type NSQD struct {
  2. //一個nsqd實例可以有多個Topic
  3. topicMap map[string]*Topic
  4. }
  5. type Topic struct {
  6. name string
  7. //一個Topic實例下有多個Channel
  8. channelMap map[string]*Channel
  9. memoryMsgChan chan *Message
  10. }
  11. //golang中g(shù)oroutine之間的是通過chan來通信的,如果想要往該topic發(fā)布消息,只需要將消息寫到Topic.memoryMsgChan中
  12. //Topic創(chuàng)建成功后會開啟一個新的goroutine(messagePump)負責監(jiān)聽Topic.memoryMsgChan,當有新消息時會將將消息復制N份發(fā)送到該Topic下的所有Channel中
  13. func NewTopic(topicName string) *Topic {
  14. t := &Topic{
  15. name: topicName,
  16. channelMap: make(map[string]*Channel),//該Topic下的所有Channel
  17. memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize),
  18. exitChan: make(chan int),
  19. }
  20. //開啟一個goroutine負責監(jiān)聽寫到該Topic的消息
  21. t.waitGroup.Wrap(func() { t.messagePump() })
  22. return t
  23. }
  24. func (t *Topic) messagePump() {
  25. var msg *Message
  26. var chans []*Channel
  27. var memoryMsgChan chan *Message
  28. //取出該Topic下所有的Channel
  29. for _, c := range t.channelMap {
  30. chans = append(chans, c)
  31. }
  32. for {
  33. //從memoryMsgChan中取出一個消息,并將消息復制N份,發(fā)送到N個Channel中
  34. select {
  35. case msg = <-memoryMsgChan:
  36. case <-t.exitChan:
  37. return
  38. }
  39. for i, channel := range chans {
  40. chanMsg := NewMessage(msg.ID, msg.Body)
  41. chanMsg.Timestamp = msg.Timestamp
  42. err := channel.PutMessage(chanMsg)//消息寫入到channel的Channel.memoryMsgChan中
  43. }
  44. }
  45. }
  46. //Channel.memoryMsgChan負責接收寫到該Channel的所有消息
  47. //創(chuàng)建創(chuàng)建Channel時會開啟一個新的goroutine(messagePump)負責監(jiān)聽Channel.memoryMsgChan,當有消息時會將該消息寫到Channel.clientMsgChan中,訂閱該channel的consumer都會試圖從clientMsgChan中取消息,一條消息只能被一個consumer搶到
  48. //Channel還負責消息的可靠傳遞,當消息發(fā)往consumer時,Channel會記錄下該消息的發(fā)送時間,如果在一定時間內(nèi)(msg-timeout參數(shù))沒有接受到consumer對該消息的確認,Channel會將該消息重新寫到Channel.memoryMsgChan中,再次發(fā)送給客戶端。
  49. type Channel struct {
  50. name string //channel的名稱
  51. memoryMsgChan chan *Message
  52. clientMsgChan chan *Message
  53. clients map[int64]Consumer
  54. }
  55. func NewChannel(topicName string, channelName string)
  56. c := &Channel{
  57. topicName: topicName,
  58. name: channelName,
  59. memoryMsgChan: make(chan *Message, ctx.nsqd.opts.MemQueueSize),
  60. clientMsgChan: make(chan *Message),
  61. exitChan: make(chan int),
  62. }
  63. go c.messagePump()
  64. return c
  65. }
  66. //往channel中寫入消息。
  67. func (c *Channel) put(m *Message) error {
  68. select {
  69. case c.memoryMsgChan <- m:
  70. }
  71. return nil
  72. }
  73. func (c *Channel) messagePump() {
  74. var msg *Message
  75. for {
  76. select {
  77. case msg = <-c.memoryMsgChan:
  78. case <-c.exitChan:
  79. goto exit
  80. }
  81. c.clientMsgChan <- msg //多個消費者會同時爭搶clientMsgChan中得消息,但只有一個消費者爭搶到
  82. }
  83. exit:
  84. close(c.clientMsgChan)
  85. }

要理解Topic Channel中各種chan的作用,關(guān)鍵是要理解golang中如何在并發(fā)環(huán)境下如何操作一個結(jié)構(gòu)體(多個goroutine同時操作topic),與C/C++多線程操作同一個結(jié)構(gòu)體時加鎖(mutex,rwmutex)不同,go語言中一般是為這個結(jié)構(gòu)體(topic,channel)開啟一個主goroutine(messagePump函數(shù)),所有對該結(jié)構(gòu)體的改變的操作都應是該主goroutine完成的,也就不存在并發(fā)的問題了,其它goroutine如果想要改變這個結(jié)構(gòu)體則應該向結(jié)構(gòu)體提供的chan中發(fā)送消息(msgchan)或者通知(exitchan,updatechan),主goroutine會一直監(jiān)聽所有的chan,當有消息或者通知到來時做相應的處理。

數(shù)據(jù)的持久化

了解數(shù)據(jù)的持久化之前,我們先來看兩個問題?
1. 往Topic中寫入消息就是將消息發(fā)送到Topic.memoryMsgChan中,但是memoryMsgChan是一個固定內(nèi)存大小的內(nèi)存隊列,如果隊列滿了怎么辦呢?會阻塞嗎?
2. 如果消息都存放在memoryMsgChan這個內(nèi)存隊列中,程序退出了消息就全部丟失了嗎?

NSQ是如何解決的呢,nsq在創(chuàng)建Topic、Channel的時候都會創(chuàng)建一個DiskQueue,DiskQueue負責向磁盤文件中寫入消息、從磁盤文件中讀取消息,是NSQ實現(xiàn)數(shù)據(jù)持久化的最重要結(jié)構(gòu)。
以Topic為例,如果向Topic.memoryMsgChan寫入消息但是memoryMsgChan已滿時,nsq會將消息寫到topic.DiskQueue中,DiskQueue會負責將消息內(nèi)存同步到磁盤上。
如果從Topic.memoryMsgChan中讀取消息時,但是memoryMsgChan并沒有消息時,就從topic.DiskQueue中取出同步到磁盤文件中的消息。

  1. func NewTopic(topicName string,ctx *context) *Topic {
  2. ... //其它初始化代碼
  3. // ctx.nsqd.opts都是一些程序啟動時的命令行參數(shù)
  4. t.backend = newDiskQueue(topicName,
  5. ctx.nsqd.opts.DataPath,
  6. ctx.nsqd.opts.MaxBytesPerFile,
  7. ctx.nsqd.opts.SyncEvery,
  8. ctx.nsqd.opts.SyncTimeout,
  9. ctx.nsqd.opts.Logger)
  10. return t
  11. }
  12. //將消息寫到topic的channel中,如果topic的memoryMsgChan已滿則將topic寫到磁盤文件中
  13. func (t *Topic) put(m *Message) error {
  14. select {
  15. case t.memoryMsgChan <- m:
  16. default:
  17. //從buffer池中取出一個buffer接口,將消息寫到buffer中,再將buffer寫到topic.backend的wirteChan中
  18. //buffer池是為了避免重復的創(chuàng)建銷毀buffer對象
  19. b := bufferPoolGet()
  20. t.backend.WriteChan <- b
  21. bufferPoolPut(b)
  22. }
  23. return nil
  24. }
  25. func (t *Topic) messagePump() {
  26. ...//參見上文代碼
  27. for {
  28. //從memoryMsgChan及DiskQueue.ReadChan中取消息
  29. select {
  30. case msg = <-memoryMsgChan:
  31. case buf = <- t.backend.ReadChan():
  32. msg, _ = decodeMessage(buf)
  33. case <-t.exitChan:
  34. return
  35. }
  36. ... //將msg復制N份,發(fā)送到topic下的N個Channel中
  37. }
  38. }

我們看到topic.backend(diskQueue)負責將消息寫到磁盤并從磁盤中讀取消息,diskQueue提供了兩個chan供外部使用:readChan與writeChan。
我們來看下diskQueue實現(xiàn)中的幾個要點。

  1. diskQueue在創(chuàng)建時會開啟一個goroutine,從磁盤文件中讀取消息寫到readChan中,外部goroutine可以從readChan中獲取消息;隨時監(jiān)聽writeChan,當有消息時從wirtechan中取出消息,寫到本地磁盤文件。
  2. diskQueue既要提供文件的讀服務又要提供文件的寫服務,所以要記錄下文件的讀位置(readIndex),寫位置(writeIndex)。每次從文件中讀取消息時使用file.Seek(readindex)定位到文件讀位置然后讀取消息信息,每次往文件中寫入消息時都要file.Seek(writeIndex)定位到寫位置再將消息寫入。
  3. readIndex,writeIndex很重要,程序退出時要將這些信息(meta data)寫到另外的磁盤文件(元信息文件)中,程序啟動時首先讀取元信息文件,在根據(jù)元信息文件中的readIndex writeIndex操作存儲信息的文件。
  4. 由于操作系統(tǒng)層也有緩存,調(diào)用file.Write()寫入的信息,也可能只是存在緩存中并沒有同步到磁盤,需要顯示調(diào)用file.sync()才可以強制要求操作系統(tǒng)把緩存同步到磁盤。可以通過指定創(chuàng)建diskQueue時傳入的syncEvery,syncTimeout來控制調(diào)用file.sync()的頻率。syncTimeout是指每隔syncTimeout秒調(diào)用一次file.sync(),syncEvery是指每當寫入syncEvery個消息后調(diào)用一次file.sync()。這兩個參數(shù)都可以在啟動nsqd程序時通過命令行指定。

網(wǎng)絡架構(gòu)

nsq是一個可靠的、高性能的服務端網(wǎng)絡程序,通過閱讀nsqd的源碼來學習如何搭建一個可靠的網(wǎng)絡服務端程序。

  1. //首先是監(jiān)聽端口,當有請求到來時開啟一個goroutine去處理該鏈接請求
  2. func TCPServer(listener net.Listener) {
  3. for {
  4. clientConn, err := listener.Accept()
  5. go Handle(clientConn)
  6. }
  7. }
  8. func Handle(clientConn net.Conn) {
  9. //客戶端首先需要發(fā)送一個四字節(jié)的協(xié)議編號,表示客戶端當前所使用的協(xié)議
  10. //這樣便于以后平滑的協(xié)議升級,服務端可以根據(jù)客戶端的協(xié)議編號做不同的處理
  11. buf := make([]byte, 4)
  12. _, err := io.ReadFull(clientConn, buf)
  13. protocolMagic := string(buf)
  14. var prot util.Protocol
  15. switch protocolMagic {
  16. case " V2":
  17. prot = &protocolV2{ctx: p.ctx}
  18. default:
  19. return
  20. }
  21. //成功建立連接,按照相應的協(xié)議編號去處理該鏈接
  22. err = prot.IOLoop(clientConn)
  23. return
  24. }
  25. }

客戶端已成功的與服務器建立鏈接了,每一個客戶端建立連接后,nsqd都會創(chuàng)建一個Client接口體,該結(jié)構(gòu)體內(nèi)保存一些client的狀態(tài)信息。
每一個Client都會有兩個goroutine,一個goroutine負責讀取客戶端主動發(fā)送的各種命令,解析命令,處理命令并將處理結(jié)果回復給客戶端。
另一個goutine負責定時發(fā)送心跳信息給客戶端,如果客戶端訂閱某個channel的話則將channel中的將消息通過網(wǎng)絡發(fā)送給客戶端。

如果服務端不需要主動推送大量消息給客戶端,一個連接只需要開一個goroutine處理請求并發(fā)送回復就可以了,這是最簡單的方式。開啟兩個goroutine操作同一個conn的話就需要注意加鎖了。

  1. func (p *protocolV2) IOLoop(conn net.Conn) error {
  2. //創(chuàng)建一個新的Client對象
  3. clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
  4. client := newClientV2(clientID, conn, p.ctx)
  5. //開啟另一個goroutine,定時發(fā)送心跳信息,客戶端收到心跳信息后要回復。
  6. //如果nsqd長時間未收到該連接的心跳回復說明連接已出問題,會斷開連接,這就是nsq的心跳實現(xiàn)機制
  7. go p.messagePump(client)
  8. for {
  9. //如果超過client.HeartbeatInterval * 2時間間隔內(nèi)未收到客戶端發(fā)送的命令,說明連接處問題了,需要關(guān)閉此鏈接。
  10. //正常情況下每隔HeartbeatInterval時間客戶端都會發(fā)送一個心跳回復。
  11. client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
  12. //nsq規(guī)定所有的命令以 “\n”結(jié)尾,命令與參數(shù)之間以空格分隔
  13. line, err = client.Reader.ReadSlice('\n')
  14. //params[0]為命令的類型,params[1:]為命令參數(shù)
  15. params := bytes.Split(line, separatorBytes)
  16. //處理客戶端發(fā)送過來的命令
  17. response, err := p.Exec(client, params)
  18. if err != nil {
  19. sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
  20. if _, ok := err.(*util.FatalClientErr); ok {
  21. break
  22. }
  23. continue
  24. }
  25. //將命令的處理結(jié)果發(fā)送給客戶端
  26. if response != nil {
  27. err = p.Send(client, frameTypeResponse, response)
  28. }
  29. }
  30. //連接出問題了,需要關(guān)閉連接
  31. conn.Close()
  32. close(client.ExitChan) //關(guān)閉client的ExitChan
  33. //client.Channel記錄的是該客戶端訂閱的Channel,客戶端關(guān)閉的時候需要從Channel中移除這個訂閱者。
  34. if client.Channel != nil {
  35. client.Channel.RemoveClient(client.ID)
  36. }
  37. return err
  38. }
  39. func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
  40. switch {
  41. case bytes.Equal(params[0], []byte("FIN")):
  42. return p.FIN(client, params)
  43. case bytes.Equal(params[0], []byte("RDY")):
  44. return p.RDY(client, params)
  45. case bytes.Equal(params[0], []byte("PUB")):
  46. return p.PUB(client, params)
  47. case bytes.Equal(params[0], []byte("NOP")):
  48. return p.NOP(client, params)
  49. case bytes.Equal(params[0], []byte("SUB")):
  50. return p.SUB(client, params)
  51. }
  52. return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
  53. }

我們來看下NSQ中幾個比較重要的命令:

  • NOP 心跳回復,沒有實際意義
  • PUB 發(fā)布一個消息到 話題(topic)
    PUB <topic_name>\n [ 四字節(jié)消息的大小 ][ 消息的內(nèi)容 ]
  • SUB 訂閱話題(topic) /通道(channel)
    SUB <topic_name> <channel_name>\n
  • RDY 更新 RDY 狀態(tài) (表示客戶端已經(jīng)準備好接收N 消息)
    RDY <count>\n
  • FIN 完成一個消息 (表示成功處理)
    FIN <message_id>\n

生產(chǎn)者產(chǎn)生消息的過程比較簡單,就是一個PUB命令,先讀取四字節(jié)的消息大小,然后根據(jù)消息大小讀取消息內(nèi)容,然后將內(nèi)容寫到topic.MessageChan中。
我們重點來看下消費者是如何從nsq中讀取消息的。
1. 消費者首先需要發(fā)送SUB命令,告訴nsqd它想訂閱哪個Channel,然后nsqd將該Client與Channel建立對應關(guān)系。
2. 消費者發(fā)送RDY命令,告訴服務端它以準備好接受count個消息,服務端則向消費者發(fā)送count個消息,如果消費者想繼續(xù)接受消息就需要不斷發(fā)送RDY命令告訴服務端自己準備好接受消息(類似TCP協(xié)議中滑動窗口的概念,消費者并不是按照順序一個個的消費消息,NSQD最多可以同時count個消息給消費者,每推送給消費者一個消息count數(shù)目減一,當消費者處理完消息回復FIN指令時count+1)。

  1. func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
  2. topicName := string(params[1])
  3. channelName := string(params[2])
  4. topic := p.ctx.nsqd.GetTopic(topicName)
  5. channel := topic.GetChannel(channelName)
  6. //將Client與Channel建立關(guān)聯(lián)關(guān)系
  7. channel.AddClient(client.ID, client)
  8. client.Channel = channel
  9. // update message pump
  10. client.SubEventChan <- channel
  11. return okBytes, nil
  12. }
  13. func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
  14. subEventChan := client.SubEventChan
  15. heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
  16. for {
  17. //IsReadyForMessages就是檢查Client的RDY命令所設置的ReadyCount,判斷是否可以繼續(xù)向Client發(fā)送消息
  18. if subChannel == nil || !client.IsReadyForMessages() {
  19. //客戶端還未做好準備則將clientMsgChan設置為nil
  20. clientMsgChan = nil
  21. } else {
  22. //客戶端做好準備,則試圖從訂閱的Channel的clientMsgChan中讀取消息
  23. clientMsgChan = subChannel.clientMsgChan
  24. }
  25. select {
  26. //接收到客戶端發(fā)送的RDY命令后,則會向ReadyStateChan中寫入消息,下面的case條件則可滿足,重新進入for循環(huán)
  27. case <-client.ReadyStateChan:
  28. //接收到客戶端發(fā)送的SUB命令后,會向subEventChan中寫入消息,subEventChan則被置為nil,所以一個客戶端只能訂閱一次Channel
  29. case subChannel = <-subEventChan:
  30. // you can't SUB anymore
  31. subEventChan = nil
  32. //發(fā)送心跳消息
  33. case <-heartbeatChan:
  34. err = p.Send(client, frameTypeResponse, heartbeatBytes)
  35. //會有N個消費者共同監(jiān)聽channel.clientMsgChan,一條消息只能被一個消費者搶到
  36. case msg, ok := <-clientMsgChan:
  37. if !ok {
  38. goto exit
  39. }
  40. //以消息的發(fā)送時間排序,將消息放在一個最小時間堆上,如果在規(guī)定時間內(nèi)收到對該消息的確認回復(FIN messageId)說明消息以被消費者成功處理,會將該消息從堆中刪除。
  41. //如果超過一定時間沒有接受 FIN messageId,會從堆中取出該消息重新發(fā)送,所以nsq能確保一個消息至少被一個i消費處理。
  42. subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
  43. client.SendingMessage()
  44. //通過網(wǎng)絡發(fā)送給消費者
  45. err = p.SendMessage(client, msg, &buf)
  46. case <-client.ExitChan:
  47. goto exit
  48. }
  49. }
  50. exit:
  51. heartbeatTicker.Stop()
  52. }

參考文獻

NSQ 指南
使用消息隊列的 10 個理由
關(guān)于go同步和異步模式的疑惑


轉(zhuǎn)自:http://shanks./post/NSQ%E6%BA%90%E7%A0%81%E5%89%96%E6%9E%90%E4%B9%8BNSQD

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    大香蕉伊人一区二区三区| 色婷婷视频免费在线观看| 欧美午夜视频免费观看| 欧美日本亚欧在线观看| 国产中文字幕一二三区| 中文字幕人妻综合一区二区| 欧美一级内射一色桃子| 91在线爽的少妇嗷嗷叫| 黄色国产一区二区三区| 国产成人亚洲欧美二区综| 国产精品国三级国产专不卡| 亚洲男人的天堂久久a| 国产中文字幕一区二区| 激情亚洲内射一区二区三区| 福利在线午夜绝顶三级| 丁香六月婷婷基地伊人| 日韩免费成人福利在线| 亚洲精品成人福利在线| 久久99这里只精品热在线| 欧美日韩人妻中文一区二区| 亚洲最新av在线观看| 精品推荐国产麻豆剧传媒| 国产传媒精品视频一区| 精品熟女少妇av免费久久野外| 日本大学生精油按摩在线观看| 欧美大胆美女a级视频| 欧美日本精品视频在线观看| 特黄大片性高水多欧美一级| 青青操成人免费在线视频| 亚洲一区二区三区福利视频| 成年男女午夜久久久精品| 在线免费观看一二区视频| 老熟女露脸一二三四区| 欧美午夜伦理在线观看| 黄男女激情一区二区三区| 国产水滴盗摄一区二区| 千仞雪下面好爽好紧好湿全文| 日韩人妻一区二区欧美| 乱女午夜精品一区二区三区 | 日韩欧美综合中文字幕| 亚洲淫片一区二区三区|