一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

Goroutine + Channel 實(shí)踐 | Golang中文社區(qū)(Go語言構(gòu)建) | Go語言中文網(wǎng) | Go語言學(xué)習(xí)園地

 dazheng 2015-06-09

背景

在最近開發(fā)的項(xiàng)目中,后端需要編寫許多提供HTTP接口的API,另外技術(shù)選型相對寬松,因此選擇Golang + Beego框架進(jìn)行開發(fā)。之所以選擇Golang,主要是考慮到開發(fā)的模塊,都需要接受瞬時(shí)大并發(fā)、請求需要經(jīng)歷多個(gè)步驟、處理時(shí)間較長、無法同步立即返回結(jié)果的場景,Golang的goroutine以及channel所提供的語言層級的特性,正好可以滿足這方面的需要。

goroutine不同于thread,threads是操作系統(tǒng)中的對于一個(gè)獨(dú)立運(yùn)行實(shí)例的描述,不同操作系統(tǒng),對于thread的實(shí)現(xiàn)也不盡相同;但是,操作系統(tǒng)并不知道goroutine的存在,goroutine的調(diào)度是有Golang運(yùn)行時(shí)進(jìn)行管理的。啟動(dòng)thread雖然比process所需的資源要少,但是多個(gè)thread之間的上下文切換仍然是需要大量的工作的(寄存器/Program Count/Stack Pointer/...),Golang有自己的調(diào)度器,許多goroutine的數(shù)據(jù)都是共享的,因此goroutine之間的切換會(huì)快很多,啟動(dòng)goroutine所耗費(fèi)的資源也很少,一個(gè)Golang程序同時(shí)存在幾百個(gè)goroutine是很正常的。

channel,即“管道”,是用來傳遞數(shù)據(jù)(叫消息更為合適)的一個(gè)數(shù)據(jù)結(jié)構(gòu),即可以從channel里面塞數(shù)據(jù),也可以從中獲取數(shù)據(jù)。channel本身并沒有什么神奇的地方,但是channel加上了goroutine,就形成了一種既簡單又強(qiáng)大的請求處理模型,即N個(gè)工作goroutine將處理的中間結(jié)果或者最終結(jié)果放入一個(gè)channel,另外有M個(gè)工作goroutine從這個(gè)channel拿數(shù)據(jù),再進(jìn)行進(jìn)一步加工,通過組合這種過程,從而勝任各種復(fù)雜的業(yè)務(wù)模型。

模型

自己在實(shí)踐的過程中,產(chǎn)生了幾種通過goroutine + channel實(shí)現(xiàn)的工作模型,本文分別對這些模型進(jìn)行介紹。

V0.1: go關(guān)鍵字

直接加上go關(guān)鍵字,就可以讓一個(gè)函數(shù)脫離原先的主函數(shù)獨(dú)立運(yùn)行,即主函數(shù)直接繼續(xù)進(jìn)行剩下的操作,而不需要等待某個(gè)十分耗時(shí)的操作完成。比如我們在寫一個(gè)服務(wù)模塊,接收到前端請求之后,然后去做一個(gè)比較耗時(shí)的任務(wù)。比如下面這個(gè):

func (m *SomeController) PorcessSomeTask() {
    var task models.Task
    if err := task.Parse(m.Ctx.Request); err != nil {
        m.Data["json"] = err 
        m.ServeJson()
        return
    }
    task.Process()
    m.ServeJson()

如果Process函數(shù)需要耗費(fèi)大量時(shí)間的話,這個(gè)請求就會(huì)被block住。有時(shí)候,前端只需要發(fā)出一個(gè)請求給后端,并且不需要后端立即所處響應(yīng)。遇到這樣的需求,直接在耗時(shí)的函數(shù)前面加上go關(guān)鍵字就可以將請求之間返回給前端了,保證了體驗(yàn)。

func (m *SomeController) PorcessSomeTask() {
    var task models.Task
    if err := task.Parse(m.Ctx.Request); err != nil {
        m.Data["json"] = err 
        m.ServeJson()
        return
    }
    go task.Process()
    m.ServeJson()

不過,這種做法也是有許多限制的。比如:

  • 只能在前端不需要立即得到后端處理的結(jié)果的情況下
  • 這種請求的頻率不應(yīng)該很大,因?yàn)槟壳暗淖龇]有控制并發(fā)

V0.2: 并發(fā)控制

上一個(gè)方案有一個(gè)缺點(diǎn)就是無法控制并發(fā),如果這一類請求同一個(gè)時(shí)間段有很多的話,每一個(gè)請求都啟動(dòng)一個(gè)goroutine,如果每個(gè)goroutine中還需要使用其他系統(tǒng)資源,消耗將是不可控的。

遇到這種情況,一個(gè)解決方案是:將請求都轉(zhuǎn)發(fā)給一個(gè)channel,然后初始化多個(gè)goroutine讀取這個(gè)channel中的內(nèi)容,并進(jìn)行處理。假設(shè)我們可以新建一個(gè)全局的channel

var TASK_CHANNEL = make(chan models.Task)

然后,啟動(dòng)多個(gè)goroutine:

for i := 0; i < WORKER_NUM; i ++ {
    go func() {
        for {
            select {
            case task := <- TASK_CHANNEL:
                task.Process()
            }
        }
    } ()
}

服務(wù)端接收到請求之后,將任務(wù)傳入channel中即可:

func (m *SomeController) PorcessSomeTask() {
    var task models.Task
    if err := task.Parse(m.Ctx.Request); err != nil {
        m.Data["json"] = err 
        m.ServeJson()
        return
    }
    //go task.Process()
    TASK_CHANNEL <- task
    m.ServeJson()
}

這樣一來,這個(gè)操作的并發(fā)度就可以通過WORKER_NUM來控制了。

V0.3: 處理channel滿的情況

不過,上面方案有一個(gè)bug:那就是channel初始化時(shí)是沒有設(shè)置長度的,因此當(dāng)所有WORKER_NUM個(gè)goroutine都正在處理請求時(shí),再有請求過來的話,仍然會(huì)出現(xiàn)被block的情況,而且會(huì)比沒有經(jīng)過優(yōu)化的方案還要慢(因?yàn)樾枰饶骋粋€(gè)goroutine結(jié)束時(shí)才能處理它)。因此,需要在channel初始化時(shí)增加一個(gè)長度:

var TASK_CHANNEL = make(chan models.Task, TASK_CHANNEL_LEN)

這樣一來,我們將TASK_CHANNEL_LEN設(shè)置得足夠大,請求就可以同時(shí)接收TASK_CHANNEL_LEN個(gè)請求而不用擔(dān)心被block。不過,這其實(shí)還是有問題的:那如果真的同時(shí)有大于TASK_CHANNEL_LEN個(gè)請求過來呢?一方面,這就應(yīng)該算是架構(gòu)方面的問題了,可以通過對模塊進(jìn)行擴(kuò)容等操作進(jìn)行解決。另一方面,模塊本身也要考慮如何進(jìn)行“優(yōu)雅降級了”。遇到這種情況,我們應(yīng)該希望模塊能夠及時(shí)告知調(diào)用方,“我已經(jīng)達(dá)到處理極限了,無法給你處理請求了”。其實(shí),這種需求,可以很簡單的在Golang中實(shí)現(xiàn):如果channel發(fā)送以及接收操作在select語句中執(zhí)行并且發(fā)生阻塞,default語句就會(huì)立即執(zhí)行。

select {
case TASK_CHANNEL <- task:
    //do nothing
default:
    //warnning!
    return fmt.Errorf("TASK_CHANNEL is full!")
}
//...

V0.4: 接收發(fā)送給channel之后返回的結(jié)果

如果處理程序比較復(fù)雜的時(shí)候,通常都會(huì)出現(xiàn)在一個(gè)goroutine中,還會(huì)發(fā)送一些中間處理的結(jié)果發(fā)送給其他goroutine去做,經(jīng)過多道“工序”才能最終將結(jié)果產(chǎn)出。

那么,我們既需要把某一個(gè)中間結(jié)果發(fā)送給某個(gè)channel,也要能獲取到處理這次請求的結(jié)果。解決的方法是:將一個(gè)channel實(shí)例包含在請求中,goroutine處理完成后將結(jié)果寫回這個(gè)channel。

type TaskResponse struct {
    //...
}

type Task struct {
    TaskParameter   SomeStruct
    ResChan         *chan TaskResponse
}

//...

task := Task {
    TaskParameter   : xxx,
    ResChan         : make(chan TaskResponse),
}

TASK_CHANNEL <- task
res := <- task.ResChan
//...

(這邊可能會(huì)有疑問:為什么不把一個(gè)復(fù)雜的任務(wù)都放在一個(gè)goroutine中依次的執(zhí)行呢?是因?yàn)檫@里需要考慮到不同子任務(wù),所消耗的系統(tǒng)資源不盡相同,有些是CPU集中的,有些是IO集中的,所以需要對這些子任務(wù)設(shè)置不同的并發(fā)數(shù),因此需要經(jīng)由不同的channel + goroutine去完成。)

V0.5: 等待一組goroutine的返回

將任務(wù)經(jīng)過分組,交由不同的goroutine進(jìn)行處理,最終再將每個(gè)goroutine處理的結(jié)果進(jìn)行合并,這個(gè)是比較常見的處理流程。這里需要用到WaitGroup來對一組goroutine進(jìn)行同步。一般的處理流程如下:

var wg sync.WaitGroup
for i := 0; i < someLen; i ++ {
    wg.Add(1)
    go func(t Task) {
        defer wg.Done()
        //對某一段子任務(wù)進(jìn)行處理
    } (tasks[i])
}

wg.Wait()
//處理剩下的工作

V0.6: 超時(shí)機(jī)制

即使是復(fù)雜、耗時(shí)的任務(wù),也必須設(shè)置超時(shí)時(shí)間。一方面可能是業(yè)務(wù)對此有時(shí)限要求(用戶必須在XX分鐘內(nèi)看到結(jié)果),另一方面模塊本身也不能都消耗在一直無法結(jié)束的任務(wù)上,使得其他請求無法得到正常處理。因此,也需要對處理流程增加超時(shí)機(jī)制。

我一般設(shè)置超時(shí)的方案是:和之前提到的“接收發(fā)送給channel之后返回的結(jié)果”結(jié)合起來,在等待返回channel的外層添加select,并在其中通過time.After()來判斷超時(shí)。

task := Task {
    TaskParameter   : xxx,
    ResChan         : make(chan TaskResponse),
}

select {
case res := <- task.ResChan:
    //...
case <- time.After(PROCESS_MAX_TIME):
    //處理超時(shí)
}

V0.7: 廣播機(jī)制

既然有了超時(shí)機(jī)制,那也需要一種機(jī)制來告知其他goroutine結(jié)束手上正在做的事情并退出。很明顯,還是需要利用channel來進(jìn)行交流,第一個(gè)想到的肯定就是向某一個(gè)chan發(fā)送一個(gè)struct即可。比如執(zhí)行任務(wù)的goroutine在參數(shù)中,增加一個(gè)chan struct{}類型的參數(shù),當(dāng)接收到該channel的消息時(shí),就退出任務(wù)。但是,還需要解決兩個(gè)問題:

  1. 怎樣能在執(zhí)行任務(wù)的同時(shí)去接收這個(gè)消息呢?
  2. 如何通知所有的goroutine?

對于第一個(gè)問題,比較優(yōu)雅的作法是:使用另外一個(gè)channel作為函數(shù)d輸出,再加上select,就可以一邊輸出結(jié)果,一邊接收退出信號了。

另一方面,對于同時(shí)有未知數(shù)目個(gè)執(zhí)行g(shù)oroutine的情況,一次次調(diào)用done <-struct{}{},顯然無法實(shí)現(xiàn)。這時(shí)候,就會(huì)用到golang對于channel的tricky用法:當(dāng)關(guān)閉一個(gè)channel時(shí),所有因?yàn)榻邮赵揷hannel而阻塞的語句會(huì)立即返回。示例代碼如下:

// 執(zhí)行方
func doTask(done <-chan struct{}, tasks <-chan Task) (chan Result) {
    out := make(chan Result)
    go func() {
        // close 是為了讓調(diào)用方的range能夠正常退出
        defer close(out)
        for t := range tasks {
            select {
            case result <-f(task):
            case <-done:
                return
            }
        }
    }()

    return out
}

// 調(diào)用方
func Process(tasks <-chan Task, num int) {
    done := make(chan struct{})
    out := doTask(done, tasks)

    go func() {
        <- time.After(MAX_TIME)
        //done <-struct{}{}

        //通知所有的執(zhí)行g(shù)oroutine退出
        close(done)
    }()

    // 因?yàn)間oroutine執(zhí)行完畢,或者超時(shí),導(dǎo)致out被close,range退出
    for res := range out {
        fmt.Println(res)
        //...
    }
}

參考

-- EOF --

    本站是提供個(gè)人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    偷拍偷窥女厕一区二区视频 | 午夜福利精品视频视频| 91国内视频一区二区三区| 日本高清一区免费不卡| 免费一区二区三区少妇| 在线观看国产午夜福利| 欧美亚洲国产日韩一区二区| 欧美一区二区三区在线播放| 日韩精品一级片免费看| 欧美一本在线免费观看| 国产熟女一区二区精品视频| 亚洲欧美日本国产有色| 久久精品国产亚洲av麻豆| 五月婷婷缴情七月丁香| 国产午夜精品久久福利 | 久久99夜色精品噜噜亚洲av| 人妻少妇系列中文字幕| 一本久道久久综合中文字幕| 国产午夜福利在线免费观看| 日本欧美视频在线观看免费| 亚洲淫片一区二区三区| 激情中文字幕在线观看 | 日韩欧美一区二区久久婷婷| 亚洲国产精品国自产拍社区| 婷婷亚洲综合五月天麻豆| 国产一区欧美一区二区| 日本加勒比在线观看不卡| 国产成人国产精品国产三级 | 成人免费视频免费观看| 四十女人口红哪个色好看| 五月婷婷缴情七月丁香| 一二区不卡不卡在线观看| 日本加勒比中文在线观看| 丰满少妇高潮一区二区| 精品国产一区二区欧美| 欧美人禽色视频免费看| 亚洲香艳网久久五月婷婷| 国产午夜精品福利免费不| 成人欧美一区二区三区视频| 少妇高潮呻吟浪语91| 亚洲中文在线观看小视频|