作者:dcguo,騰訊 CSIG 電子簽開放平臺(tái)中心 分享 Golang 并發(fā)基礎(chǔ)庫(kù),擴(kuò)展以及三方庫(kù)的一些常見問(wèn)題、使用介紹和技巧,以及對(duì)一些并發(fā)庫(kù)的選擇和優(yōu)化探討。
go 原生/擴(kuò)展庫(kù)提倡的原則 不要通過(guò)共享內(nèi)存進(jìn)行通信;相反,通過(guò)通信來(lái)共享內(nèi)存。 Goroutine goroutine 并發(fā)模型 調(diào)度器主要結(jié)構(gòu) 主要調(diào)度器結(jié)構(gòu)是 M,P,G - M,內(nèi)核級(jí)別線程,goroutine 基于 M 之上,代表執(zhí)行者,底層線程,物理線程
- P,處理器,用來(lái)執(zhí)行 goroutine,因此維護(hù)了一個(gè) goroutine 隊(duì)列,里面存儲(chǔ)了所有要執(zhí)行的 goroutine,將等待執(zhí)行的 G 與 M 對(duì)接,它的數(shù)目也代表了真正的并發(fā)度( 即有多少個(gè) goroutine 可以同時(shí)進(jìn)行 );
- G,goroutine 實(shí)現(xiàn)的核心結(jié)構(gòu),相當(dāng)于輕量級(jí)線程,里面包含了 goroutine 需要的棧,程序計(jì)數(shù)器,以及所在 M 的信息
P 的數(shù)量由環(huán)境變量中的 GOMAXPROCS 決定,通常來(lái)說(shuō)和核心數(shù)對(duì)應(yīng)。 映射關(guān)系用戶空間線程和內(nèi)核空間線程映射關(guān)系有如下三種: - N:1
- 1:1
- M:N
調(diào)度圖關(guān)系如圖,灰色的 G 則是暫時(shí)還未運(yùn)行的,處于就緒態(tài),等待被調(diào)度,這個(gè)隊(duì)列被 P 維護(hù) 注: 簡(jiǎn)單調(diào)度圖如上,有關(guān)于 P 再多個(gè) M 中切換,公共 goroutine 隊(duì)列,M 從線程緩存中創(chuàng)建等步驟沒有體現(xiàn),復(fù)雜過(guò)程可以參考文章簡(jiǎn)單了解 goroutine 如何實(shí)現(xiàn)。 goroutine 使用go list.Sort()
func Announce(message string, delay time.Duration) {go func() {time.Sleep(delay)fmt.println(message)}()}
channelchannel 特性 創(chuàng)建 // 創(chuàng)建 channela := make(chan int)b := make(chan int, 10)// 單向 channelc := make(chan<- int)d := make(<-chan int)
存入/讀取/關(guān)閉tip: v, ok := <-a // 檢查是否成功關(guān)閉(ok = false:已關(guān)閉)
channel 使用/基礎(chǔ)ci := make(chan int)cj := make(chan int, 0)cs := make(chan *os.File, 100)
c := make(chan int)go func() {list.Sort()c <- 1}()doSomethingForValue<- c
func Server(queue chan *Request) {for req := range queue {sem <- 1go func() {process(req)<- sem}()}}
func Server(queue chan *Requet) {for req := range queue {sem <- 1go func(req *Request) {process(req)<- sem}(req)}}
func Serve(queue chan *Request) {for req := range queue {req := reqsem <- 1go func() {process(req)<-sem}()}}
channel 使用/技巧等待一個(gè)事件,也可以通過(guò) close 一個(gè) channel 就足夠了。c := make(chan bool)go func() { // close 的 channel 會(huì)讀到一個(gè)零值 close(c)}()<-c
阻塞程序開源項(xiàng)目【是一個(gè)支持集群的 im 及實(shí)時(shí)推送服務(wù)】里面的基準(zhǔn)測(cè)試的案例 取最快結(jié)果func main() { ret := make(chan string, 3) for i := 0; i < cap(ret); i++ { go call(ret) } fmt.Println(<-ret)}func call(ret chan<- string) { // do something // ... ret <- 'result'}
協(xié)同多個(gè) goroutines注: 協(xié)同多個(gè) goroutines 方案很多,這里只展示 channel 的一種。 limits := make(chan struct{}, 2)for i := 0; i < 10; i++ { go func() { // 緩沖區(qū)滿了就會(huì)阻塞在這 limits <- struct{}{} do() <-limits }()}
搭配 select 操作for { select { case a := <- testChanA: // todo a case b, ok := testChanB: // todo b, 通過(guò) ok 判斷 tesChanB 的關(guān)閉情況 default: // 默認(rèn)分支 }}
main go routinue 確認(rèn) worker goroutinue 真正退出的方式func worker(testChan chan bool) { for { select { // todo some // case ... case <- testChan: testChan <- true return } }}func main() { testChan := make(chan bool) go worker(testChan) testChan <- true <- testChan}
關(guān)閉的 channel 不會(huì)被阻塞testChan := make(chan bool)close(testChan)zeroValue := <- testChanfmt.Println(zeroValue) // falsetestChan <- true // panic: send on closed channel
注: 如果是 buffered channel, 即使被 close, 也可以讀到之前存入的值,讀取完畢后開始讀零值,寫入則會(huì)觸發(fā) panic nil channel 讀取和存入都不會(huì)阻塞,close 會(huì) panic略 range 遍歷 channelfor rangec := make(chan int, 20)go func() { for i := 0; i < 10; i++ { c <- i } close(c)}()// 當(dāng) c 被關(guān)閉后,取完里面的元素就會(huì)跳出循環(huán)for x := range c { fmt.Println(x)}
例: 唯一 idfunc newUniqueIdService() <-chan string { id := make(chan string) go func() { var counter int64 = 0 for { id <- fmt.Sprintf('%x', counter) counter += 1 } }() return id}func newUniqueIdServerMain() { id := newUniqueIdService() for i := 0; i < 10; i++ { fmt.Println(<- id) }}
帶緩沖隊(duì)列構(gòu)造略 超時(shí) timeout 和心跳 heart beat- 超時(shí)控制
func main() {done := do()select {case <-done:// logiccase <-time.After(3 * time.Second):// timeout}}
demo 開源 im/goim 項(xiàng)目中的應(yīng)用 2.心跳 done := make(chan bool)defer func() {close(done)}()ticker := time.NewTicker(10 * time.Second)go func() {for {select {case <-done:ticker.Stop()returncase <-ticker.C:message.Touch()}}}()}
多個(gè) goroutine 同步響應(yīng)func main() { c := make(chan struct{}) for i := 0; i < 5; i++ { go do(c) } close(c)}func do(c <-chan struct{}) { // 會(huì)阻塞直到收到 close <-c fmt.Println('hello')}
利用 channel 阻塞的特性和帶緩沖的 channel 來(lái)實(shí)現(xiàn)控制并發(fā)數(shù)量func channel() { count := 10 // 最大并發(fā) sum := 100 // 總數(shù) c := make(chan struct{}, count) sc := make(chan struct{}, sum) defer close(c) defer close(sc) for i:=0; i<sum; i++ { c <- struct{} go func(j int) { fmt.Println(j) <- c // 執(zhí)行完畢,釋放資源 sc <- struct {}{} // 記錄到執(zhí)行總數(shù) } } for i:=sum; i>0; i++ { <- sc }}
go 并發(fā)編程(基礎(chǔ)庫(kù))這塊東西為什么放到 channel 之后,因?yàn)檫@里包含了一些低級(jí)庫(kù),實(shí)際業(yè)務(wù)代碼中除了 context 之外用到都較少(比如一些鎖 mutex,或者一些原子庫(kù) atomic),實(shí)際并發(fā)編程代碼中可以用 channel 就用 channel,這也是 go 一直比較推崇得做法 Share memory by communicating; don’t communicate by sharing memory
Mutex/RWMutex鎖,使用簡(jiǎn)單,保護(hù)臨界區(qū)數(shù)據(jù) 使用的時(shí)候注意鎖粒度,每次加鎖后都要記得解鎖
Mutex demo package mainimport ( 'fmt' 'sync' 'time')func main() { var mutex sync.Mutex wait := sync.WaitGroup{} now := time.Now() for i := 1; i <= 3; i++ { wait.Add(1) go func(i int) { mutex.Lock() time.Sleep(time.Second) mutex.Unlock() defer wait.Done() }(i) } wait.Wait() duration := time.Since(now) fmt.Print(duration)}
結(jié)果: 可以看到整個(gè)執(zhí)行持續(xù)了 3 s 多,內(nèi)部多個(gè)協(xié)程已經(jīng)被 “鎖” 住了。 RWMutex demo 注意: 這東西可以并發(fā)讀,不可以并發(fā)讀寫/并發(fā)寫寫,不過(guò)現(xiàn)在即便場(chǎng)景是讀多寫少也很少用到這,一般集群環(huán)境都得分布式鎖了。 package mainimport ( 'fmt' 'sync' 'time')var m *sync.RWMutexfunc init() { m = new(sync.RWMutex)}func main() { go read() go read() go write() time.Sleep(time.Second * 3)}func read() { m.RLock() fmt.Println('startR') time.Sleep(time.Second) fmt.Println('endR') m.RUnlock()}func write() { m.Lock() fmt.Println('startW') time.Sleep(time.Second) fmt.Println('endW') m.Unlock()}
輸出: Atomic可以對(duì)簡(jiǎn)單類型進(jìn)行原子操作 int32 int64 uint32 uint64 uintptr unsafe.Pointer 可以進(jìn)行得原子操作如下 增/減 比較并且交換 假定被操作的值未曾被改變, 并一旦確定這個(gè)假設(shè)的真實(shí)性就立即進(jìn)行值替換 載入 為了原子的讀取某個(gè)值(防止寫操作未完成就發(fā)生了一個(gè)讀操作) 存儲(chǔ) 原子的值存儲(chǔ)函數(shù) 交換 原子交換
demo:增 package mainimport ( 'fmt' 'sync' 'sync/atomic')func main() { var sum uint64 var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func() { for c := 0; c < 100; c++ { atomic.AddUint64(&sum, 1) } defer wg.Done() }() } wg.Wait() fmt.Println(sum)}
結(jié)果: WaitGroup/ErrGroupwaitGroup 是一個(gè) waitGroup 對(duì)象可以等待一組 goroutinue 結(jié)束,但是他對(duì)錯(cuò)誤傳遞,goroutinue 出錯(cuò)時(shí)不再等待其他 goroutinue(減少資源浪費(fèi)) 都不能很好的解決,那么 errGroup 可以解決這部分問(wèn)題
注意 - errGroup 中如果多個(gè) goroutinue 錯(cuò)誤,只會(huì)獲取第一個(gè)出錯(cuò)的 goroutinue 的錯(cuò)誤信息,后面的則不會(huì)被感知到;
- errGroup 里面沒有做 panic 處理,代碼要保持健壯
demo: errGroup package mainimport ( 'golang.org/x/sync/errgroup' 'log' 'net/http')func main() { var g errgroup.Group var urls = []string{ 'https://github.com/', 'errUrl', } for _, url := range urls { url := url g.Go(func() error { resp, err := http.Get(url) if err == nil { _ = resp.Body.Close() } return err }) } err := g.Wait() if err != nil { log.Fatal('getErr', err) return }}
結(jié)果: once保證了傳入的函數(shù)只會(huì)執(zhí)行一次,這常用在單例模式,配置文件加載,初始化這些場(chǎng)景下。
demo: times := 10 var ( o sync.Once wg sync.WaitGroup ) wg.Add(times) for i := 0; i < times; i++ { go func(i int) { defer wg.Done() o.Do(func() { fmt.Println(i) }) }(i) } wg.Wait()
結(jié)果: Contextgo 開發(fā)已經(jīng)對(duì)他了解了太多 可以再多個(gè) goroutinue 設(shè)置截止日期,同步信號(hào),傳遞相關(guān)請(qǐng)求值
對(duì)他的說(shuō)明文章太多了,詳細(xì)可以跳轉(zhuǎn)看這篇 一文理解 golang context 這邊列一個(gè)遇到得問(wèn)題: - grpc 多服務(wù)調(diào)用,級(jí)聯(lián) cancelA -> B -> CA 調(diào)用 B,B 調(diào)用 C,當(dāng) A 不依賴 B 請(qǐng)求 C 得結(jié)果時(shí),B 請(qǐng)求 C 之后直接返回 A,那么 A,B 間 context 被 cancel,而 C 得 context 也是繼承于前面,C 請(qǐng)求直接掛掉,只需要重新搞個(gè) context 向下傳就好,記得帶上 reqId, logId 等必要信息。
并行- 某些計(jì)算可以再 CPU 之間并行化,如果計(jì)算可以被劃分為不同的可獨(dú)立執(zhí)行的部分,那么他就是可并行化的,任務(wù)可以通過(guò)一個(gè) channel 發(fā)送結(jié)束信號(hào)。假如我們可以再數(shù)組上進(jìn)行一個(gè)比較耗時(shí)的操作,操作的值在每個(gè)數(shù)據(jù)上獨(dú)立,如下:type vector []float64
func (v vector) DoSome(i, n int, u Vector, c chan int) { for ; i < n; i ++ { v[i] += u.Op(v[i]) } c <- 1 }
我們可以再每個(gè) CPU 上進(jìn)行循環(huán)無(wú)關(guān)的迭代計(jì)算,我們僅需要?jiǎng)?chuàng)建完所有的 goroutine 后,從 channel 中讀取結(jié)束信號(hào)進(jìn)行計(jì)數(shù)即可。 并發(fā)編程/工作流方案擴(kuò)展這部分如需自己開發(fā),內(nèi)容其實(shí)可以分為兩部分能力去做 并發(fā)編程增強(qiáng)方案 工作流解決方案 需要去解決一些基礎(chǔ)問(wèn)題 并發(fā)編程: 啟動(dòng) goroutine 時(shí),增加防止程序 panic 能力 去封裝一些更簡(jiǎn)單的錯(cuò)誤處理方案,比如支持多個(gè)錯(cuò)誤返回 限定任務(wù)的 goroutine 數(shù)量 工作流: 在每個(gè)工作流執(zhí)行到下一步前先去判斷上一步的結(jié)果 工作流內(nèi)嵌入一些攔截器
singlelFlight(go 官方擴(kuò)展同步包)一般系統(tǒng)重要的查詢?cè)黾恿司彺婧?,如果遇到緩存擊穿,那么可以通過(guò)任務(wù)計(jì)劃,加索等方式去解決這個(gè)問(wèn)題,singleflight 這個(gè)庫(kù)也可以很不錯(cuò)的應(yīng)對(duì)這種問(wèn)題。 它可以獲取第一次請(qǐng)求得結(jié)果去返回給相同得請(qǐng)求 核心方法 Do 執(zhí)行和返回給定函數(shù)的值,確保某一個(gè)時(shí)間只有一個(gè)方法被執(zhí)行。 如果一個(gè)重復(fù)的請(qǐng)求進(jìn)入,則重復(fù)的請(qǐng)求會(huì)等待前一個(gè)執(zhí)行完畢并獲取相同的數(shù)據(jù),返回值 shared 標(biāo)識(shí)返回值 v 是否是傳遞給重復(fù)的調(diào)用請(qǐng)求。 一句話形容他的功能,它可以用來(lái)歸并請(qǐng)求,但是最好加上超時(shí)重試等機(jī)制,防止第一個(gè) 執(zhí)行 得請(qǐng)求出現(xiàn)超時(shí)等異常情況導(dǎo)致同時(shí)間大量請(qǐng)求不可用。 場(chǎng)景: 數(shù)據(jù)變化量小(key 變化不頻繁,重復(fù)率高),但是請(qǐng)求量大的場(chǎng)景
demo package mainimport ( 'golang.org/x/sync/singleflight' 'log' 'math/rand' 'sync' 'time')var ( g singleflight.Group)const ( funcKey = 'key' times = 5 randomNum = 100)func init() { rand.Seed(time.Now().UnixNano())}func main() { var wg sync.WaitGroup wg.Add(times) for i := 0; i < times; i++ { go func() { defer wg.Done() num, err := run(funcKey) if err != nil { log.Fatal(err) return } log.Println(num) }() } wg.Wait()}func run(key string) (num int, err error) { v, err, isShare := g.Do(key, func() (interface{}, error) { time.Sleep(time.Second * 5) num = rand.Intn(randomNum) //[0,100) return num, nil }) if err != nil { log.Fatal(err) return 0, err } data := v.(int) log.Println(isShare) return data, nil}
連續(xù)執(zhí)行 3 次,返回結(jié)果如下,全部取了共享得結(jié)果: 但是注釋掉 time.Sleep(time.Second * 5) 再嘗試一次看看。 這次全部取得真實(shí)值 實(shí)踐: 伙伴部門高峰期可以減少 20% 的 Redis 調(diào)用, 大大減少了 Redis 的負(fù)載 實(shí)踐開發(fā)案例注: 下面用到的方案因?yàn)殚_發(fā)時(shí)間較早,并不一定是以上多種方案中最優(yōu)的,選擇有很多種,使用那種方案只有有所考慮可以自圓其說(shuō)即可。 建議: 項(xiàng)目中逐漸形成統(tǒng)一解決方案,從混亂到統(tǒng)一,逐漸小團(tuán)隊(duì)內(nèi)對(duì)此類邏輯形成統(tǒng)一的一個(gè)解決標(biāo)準(zhǔn),而不是大家對(duì)需求之外的控制代碼寫出各式各樣的控制邏輯。
批量校驗(yàn)批量校驗(yàn)接口限頻單賬戶最高 100qps/s,整個(gè)系統(tǒng)多個(gè)校驗(yàn)場(chǎng)景公用一個(gè)賬戶限頻需要限制批量校驗(yàn)最高為 50~80 qps/s(需要預(yù)留令牌供其他場(chǎng)景使用,否則頻繁調(diào)用批量接口時(shí)候其他場(chǎng)景均會(huì)失敗限頻)。 1.使用 go routine 來(lái)并發(fā)進(jìn)行三要素校驗(yàn),因?yàn)?go routinue,所以每次開啟 50 ~ 80 go routine 同時(shí)進(jìn)行單次三要素校驗(yàn); 2.每輪校驗(yàn)耗時(shí) 1s,如果所有 routinue 校驗(yàn)后與校驗(yàn)開始時(shí)間間隔不滿一秒,則需要主動(dòng)程序睡眠至 1s,然后開始下輪校驗(yàn); 3.因?yàn)橹皇切r?yàn)場(chǎng)景,如果某次校驗(yàn)失敗,最容易的原因其實(shí)是校驗(yàn)方異常,或者被其他校驗(yàn)場(chǎng)景再當(dāng)前 1s 內(nèi)消耗過(guò)多令牌;那么整個(gè)批量接口返回 err,運(yùn)營(yíng)同學(xué)重新發(fā)起就好。 代碼需要進(jìn)行的優(yōu)化點(diǎn): - 加鎖(推薦使用,最多不到 100 的競(jìng)爭(zhēng)者數(shù)目,使用鎖性能影響微乎其微);
- 給每個(gè)傳入 routine 的 element 數(shù)組包裝,增加一個(gè) key 屬性,每個(gè)返回的 result 包含 key 通過(guò) key 映射可以得到需要的一個(gè)順序。
1.sleep 1s 這個(gè)操作可以從調(diào)用前開始計(jì)時(shí),調(diào)用完成后不滿 1s 補(bǔ)充至 1s,而不是每次最長(zhǎng)調(diào)用時(shí)間 elapsedTime + 1s; 2.通道中獲取的三要素校驗(yàn)結(jié)果順序和入?yún)?shù)據(jù)數(shù)組順序不對(duì)應(yīng),這里通過(guò)兩種方案: 3.分組調(diào)用 getElementResponseConcurrent 方法時(shí),傳入切片可以省略部分計(jì)算,直接使用切片表達(dá)式。 elementNum := len(elements)m := elementNum / 80n := elementNum % 80if m < 1 {if results, err := getElementResponseConcurrent(ctx, elements, conn, caller); err != nil {return nil, err} else {response.Results = resultsreturn response, nil}} else {results := make([]int64, 0)if n != 0 {m = m + 1}var result []int64for i := 1; i <= m; i++ {if i == m {result, err = getElementResponseConcurrent(ctx, elements[(i-1)*80:(i-1)*80+n], conn, caller)} else {result, err = getElementResponseConcurrent(ctx, elements[(i-1)*80:i*80], conn, caller)}if err != nil {return nil, err}results = append(results, result...)}response.Results = results}// getElementResponseConcurrentfunc getElementResponseConcurrent(ctx context.Context, elements []*api.ThreeElements, conn *grpc.ClientConn,caller *api.Caller) ([]int64, error) {results := make([]int64, 0)var chResult = make(chan int64)chanErr := make(chan error)defer close(chanErr)wg := sync.WaitGroup{}faceIdClient := api.NewFaceIdClient(conn)for _, element := range elements {wg.Add(1)go func(element *api.ThreeElements) {param := element.ParamverificationRequest := &api.CheckMobileVerificationRequest{Caller: caller,Param: param,}if verification, err := faceIdClient.CheckMobileVerification(ctx, verificationRequest); err != nil {chanErr <- errreturn} else {result := verification.ResultchanErr <- nil chResult <- result}defer wg.Done()}(element)}for i := 0; i < len(elements); i++ {if err := <-chanErr; err != nil {return nil, err}var result = <-chResultresults = append(results, result)}wg.Wait()time.Sleep(time.Second)return results, nil}
歷史數(shù)據(jù)批量標(biāo)簽場(chǎng)景: 產(chǎn)品上線一年,逐步開始做數(shù)據(jù)分析和統(tǒng)計(jì)需求提供給運(yùn)營(yíng)使用,接入 Tdw 之前是直接采用接口讀歷史表進(jìn)行的數(shù)據(jù)分析,涉及全量用戶的分析給用戶記錄打標(biāo)簽,數(shù)據(jù)效率較低,所以采用并發(fā)分組方法,考慮協(xié)程比較輕量,從開始上線時(shí)間節(jié)點(diǎn)截止當(dāng)前時(shí)間分共 100 組,代碼較為簡(jiǎn)單。 問(wèn)題: 本次接口不是上線最終版,核心分析方法僅測(cè)試環(huán)境少量數(shù)據(jù)就會(huì)有 N 多條慢查詢,所以這塊還需要去對(duì)整體資源業(yè)務(wù)背景問(wèn)題去考慮,防止線上數(shù)據(jù)量較大還有慢查詢出現(xiàn) cpu 打滿。
func (s ServiceOnceJob) CompensatingHistoricalLabel(ctx context.Context,request *api.CompensatingHistoricalLabelRequest) (response *api.CompensatingHistoricalLabelResponse, err error) {if request.Key != interfaceKey {return nil, transform.Simple('err')}ctx, cancelFunc := context.WithCancel(ctx)var (wg = new(sync.WaitGroup)userRegisterDb = new(datareportdb.DataReportUserRegisteredRecords)startNum = int64(0))wg.Add(1)countHistory, err := userRegisterDb.GetUserRegisteredCountForHistory(ctx, historyStartTime, historyEndTime)if err != nil {return nil, err}div := decimal.NewFromFloat(float64(countHistory)).Div(decimal.NewFromFloat(float64(theNumberOfConcurrent)))f, _ := div.Float64()num := int64(math.Ceil(f))for i := 0; i < theNumberOfConcurrent; i++ {go func(startNum int64) {defer wg.Done()for {select {case <- ctx.Done():returndefault:userDataArr, err := userRegisterDb.GetUserRegisteredDataHistory(ctx, startNum, num)if err != nil {cancelFunc()}for _, userData := range userDataArr {if err := analyseUserAction(userData); err != nil {cancelFunc()}}}}}(startNum)startNum = startNum + num}wg.Wait()return response, nil}
批量發(fā)起/批量簽署實(shí)現(xiàn)思路和上面其實(shí)差不多,都是需要支持批量的特性,基本上現(xiàn)在業(yè)務(wù)中統(tǒng)一使用多協(xié)程處理。 思考golang 協(xié)程很牛 x,協(xié)程的數(shù)目最大到底多大合適,有什么衡量指標(biāo)么?- 衡量指標(biāo),協(xié)程數(shù)目衡量
基本上可以這樣理解這件事 - 不要一個(gè)請(qǐng)求 spawn 出太多請(qǐng)求,指數(shù)級(jí)增長(zhǎng)。這一點(diǎn),在第二點(diǎn)會(huì)受到加強(qiáng);
- 當(dāng)你生成 goroutines,需要明確他們何時(shí)退出以及是否退出,良好管理每個(gè) goroutines盡量保持并發(fā)代碼足夠簡(jiǎn)單,這樣 grroutines 得生命周期就很明顯了,如果沒做到,那么要記錄下異常 goroutine 退出的時(shí)間和原因;
- 數(shù)目的話應(yīng)該需要多少搞多少,擴(kuò)增服務(wù)而不是限制,限制一般或多或少都會(huì)不合理,不僅 delay 更會(huì)造成擁堵
- 注意 協(xié)程泄露 問(wèn)題,關(guān)注服務(wù)的指標(biāo)。
使用鎖時(shí)候正確釋放鎖的方式- 任何情況使用鎖一定要切記鎖的釋放,任何情況!任何情況!任何情況!即便是 panic 時(shí)也要記得鎖的釋放,否則可以有下面的情況
- 代碼庫(kù)提供給他人使用,出現(xiàn) panic 時(shí)候被外部 recover,這時(shí)候就會(huì)導(dǎo)致鎖沒釋放。
goroutine 泄露預(yù)防與排查一個(gè) goroutine 啟動(dòng)后沒有正常退出,而是直到整個(gè)服務(wù)結(jié)束才退出,這種情況下,goroutine 無(wú)法釋放,內(nèi)存會(huì)飆高,嚴(yán)重可能會(huì)導(dǎo)致服務(wù)不可用
goroutine 的退出其實(shí)只有以下幾種方式可以做到 - main 函數(shù)退出
- context 通知退出
- goroutine panic 退出
- goroutine 正常執(zhí)行完畢退出
大多數(shù)引起 goroutine 泄露的原因基本上都是如下情況 - channel 阻塞,導(dǎo)致協(xié)程永遠(yuǎn)沒有機(jī)會(huì)退出
- 異常的程序邏輯(比如循環(huán)沒有退出條件)
杜絕: - 想要杜絕這種出現(xiàn)泄露的情況,需要清楚的了解 channel 再 goroutine 中的使用,循環(huán)是否有正確的跳出邏輯
排查: - go pprof 工具
- runtime.NumGoroutine() 判斷實(shí)時(shí)協(xié)程數(shù)
- 第三方庫(kù)
案例: package mainimport ( 'fmt' 'net/http' _ 'net/http/pprof' 'runtime' 'time')func toLeak() { c := make(chan int) go func() { <-c }()}func main() { go toLeak() go func() { _ = http.ListenAndServe('0.0.0.0:8080', nil) }() c := time.Tick(time.Second) for range c { fmt.Printf('goroutine [nums]: %d\n', runtime.NumGoroutine()) }}
輸出: pprof: - http://127.0.0.1:8080/debug/pprof/goroutine?debug=1
復(fù)雜情況也可以用其他的可視化工具: - go tool pprof -http=:8001 http://127.0.0.1:8080/debug/pprof/goroutine?debug=1
父協(xié)程捕獲子協(xié)程 panic使用方便,支持鏈?zhǔn)秸{(diào)用
父協(xié)程捕獲子協(xié)程 panic 有鎖的地方就去用 channel 優(yōu)化有鎖的地方就去用 channel 優(yōu)化,這句話可能有點(diǎn)絕對(duì),肯定不是所有場(chǎng)景都可以做到,但是大多數(shù)場(chǎng)景絕 X 是可以的,干掉鎖去使用 channel 優(yōu)化代碼進(jìn)行解耦絕對(duì)是一個(gè)有趣的事情。
分享一個(gè)很不錯(cuò)的優(yōu)化 demo: 場(chǎng)景: - 一個(gè)簡(jiǎn)單的即時(shí)聊天室,支持連接成功的用戶收發(fā)消息,使用 socket;
- 客戶端發(fā)送消息到服務(wù)端,服務(wù)端可以發(fā)送消息到每一個(gè)客戶端。
分析: - 需要一個(gè)鏈接池保存每一個(gè)客戶端;
- 客戶端發(fā)送消息到服務(wù)端,服務(wù)端遍歷鏈接池發(fā)送給各個(gè)客戶端
- 用戶斷開鏈接,需要移除鏈接池的對(duì)應(yīng)鏈接,否則會(huì)發(fā)送發(fā)錯(cuò);
- 遍歷發(fā)送消息,需要再 goroutine 中發(fā)送,不應(yīng)該被阻塞。
問(wèn)題: - 上述有個(gè)針對(duì)鏈接池的并發(fā)操作
解決 增加鎖機(jī)制,解決針對(duì)鏈接池的并發(fā)問(wèn)題發(fā)送消息也需要去加鎖因?yàn)橐乐钩霈F(xiàn) panic: concurrent write to websocket connection 假設(shè)網(wǎng)絡(luò)延時(shí),用戶新增時(shí)候還有消息再發(fā)送中,新加入的用戶就無(wú)法獲得鎖了,后面其他的相關(guān)操作都會(huì)被阻塞導(dǎo)致問(wèn)題。 使用 channel 優(yōu)化: - 引入 channel新增客戶端集合,包含三個(gè)通道
- 鏈接新增通道 registerChan,鏈接移除通道 unregisterChan,發(fā)送消息通道 messageChan。
2.使用通道 - 移除鏈接,鏈接丟入 unregisterChan;
- 消息發(fā)送,消息丟入 messageChan;
3.通道消息方法,代碼來(lái)自于開源項(xiàng)目 簡(jiǎn)單聊天架構(gòu)演變: // 處理所有管道任務(wù)func (room *Room) ProcessTask() {log := zap.S()log.Info('啟動(dòng)處理任務(wù)')for {select {case c := <-room.register:log.Info('當(dāng)前有客戶端進(jìn)行注冊(cè)')room.clientsPool[c] = truecase c := <-room.unregister:log.Info('當(dāng)前有客戶端離開')if room.clientsPool[c] {close(c.send)delete(room.clientsPool, c)}case m := <-room.send:for c := range room.clientsPool {select {case c.send <- m:default:break}}}}}
結(jié)果: 成功使用 channel 替換了鎖。 參考- 父協(xié)程捕獲子協(xié)程 panic
- 啟發(fā)代碼 1: 微服務(wù)框架啟發(fā)代碼 2: 同步/異步工具包
- goroutine 如何實(shí)現(xiàn)
- 從簡(jiǎn)單的即時(shí)聊天來(lái)看架構(gòu)演變(simple-chatroom)
|