一区二区三区日韩精品-日韩经典一区二区三区-五月激情综合丁香婷婷-欧美精品中文字幕专区

分享

超硬核 | Kubernetes中的事件機(jī)制

 黃爸爸好 2020-03-07

我們通過 kubectl describe [資源] 命令,可以在看到Event輸出,并且經(jīng)常依賴event進(jìn)行問題定位,從event中可以分析整個POD的運(yùn)行軌跡,為服務(wù)的客觀測性提供數(shù)據(jù)來源,由此可見,event在Kubernetes中起著舉足輕重的作用。

event展示

event并不只是kubelet中都有的,關(guān)于event的操作被封裝在client-go/tools/record包,我們完全可以在寫入自定義的event。

現(xiàn)在讓我們來一步步揭開event的面紗。

Event定義

其實event也是一個資源對象,并且通過apiserver將event存儲在etcd中,所以我們也可以通過 kubectl get event 命令查看對應(yīng)的event對象。

以下是一個event的yaml文件:

apiVersion: v1
count: 1
eventTime: null
firstTimestamp: '2020-03-02T13:08:22Z'
involvedObject:
apiVersion: v1
kind: Pod
name: example-foo-d75d8587c-xsf64
namespace: default
resourceVersion: '429837'
uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4
kind: Event
lastTimestamp: '2020-03-02T13:08:22Z'
message: Pod sandbox changed, it will be killed and re-created.
metadata:
creationTimestamp: '2020-03-02T13:08:30Z'
name: example-foo-d75d8587c-xsf64.15f87ea1df862b64
namespace: default
resourceVersion: '479466'
selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64
uid: 9fe6f72a-341d-4c49-960b-e185982d331a
reason: SandboxChanged
reportingComponent: ''
reportingInstance: ''
source:
component: kubelet
host: minikube
type: Normal


主要字段說明:

  • involvedObject:觸發(fā)event的資源類型
  • lastTimestamp:最后一次觸發(fā)的時間
  • message:事件說明
  • metadata :event的元信息,name,namespace等
  • reason:event的原因
  • source:上報事件的來源,比如kubelet中的某個節(jié)點
  • type:事件類型,Normal或Warning

event字段定義可以看這里:types.go#L5078

接下來我們來看看,整個event是如何下入的。

寫入事件

1、這里以kubelet為例,看看是如何進(jìn)行事件寫入的

2、代碼是在Kubernetes 1.17.3基礎(chǔ)上進(jìn)行分析

先以一幅圖來看下整個的處理流程

創(chuàng)建操作事件的客戶端:
kubelet/app/server.go#L461

// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
if kubeDeps.Recorder != nil {
return
}
//事件廣播
eventBroadcaster := record.NewBroadcaster()
//創(chuàng)建EventRecorder
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
//發(fā)送event至log輸出
eventBroadcaster.StartLogging(klog.V(3).Infof)
if kubeDeps.EventClient != nil {
klog.V(4).Infof('Sending events to api server.')
//發(fā)送event至apiserver
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events('')})
} else {
klog.Warning('No api server defined - no events will be sent to API server.')
}
}

通過 makeEventRecorder 創(chuàng)建了 EventRecorder 實例,這是一個事件廣播器,通過它提供了StartLogging和StartRecordingToSink兩個事件處理函數(shù),分別將event發(fā)送給log和apiserver。
NewRecorder創(chuàng)建了 EventRecorder 的實例,它提供了 Event ,Eventf 等方法供事件記錄。

EventBroadcaster

我們來看下EventBroadcaster接口定義:event.go#L113

// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
//
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
StartRecordingToSink(sink EventSink) watch.Interface
StartLogging(logf func(format string, args ...interface{})) watch.Interface
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder

Shutdown()
}

具體實現(xiàn)是通過 eventBroadcasterImpl  struct來實現(xiàn)了各個方法。

其中StartLogging 和 StartRecordingToSink 其實就是完成了對事件的消費,EventRecorder實現(xiàn)對事件的寫入,中間通過channel實現(xiàn)了生產(chǎn)者消費者模型。

EventRecorder

我們先來看下EventRecorder 接口定義:event.go#L88,提供了以下4個方法

// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). 'reason' will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, eventtype, reason, message string)

// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

主要參數(shù)說明:

  • object 對應(yīng)event資源定義中的 involvedObject
  • eventtype 對應(yīng)event資源定義中的type,可選Normal,Warning.
  • reason :事件原因
  • message :事件消息

我們來看下當(dāng)我們調(diào)用 Event(object runtime.Object, eventtype, reason, message string) 的整個過程。
發(fā)現(xiàn)最終都調(diào)用到了 generateEvent 方法:event.go#L316

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
.....
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
gofunc() {
// NOTE: events should be a non-blocking operation
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}

最終事件在一個 goroutine 中通過調(diào)用 recorder.Action 進(jìn)入處理,這里保證了每次調(diào)用event方法都是非阻塞的。
其中 makeEvent 的作用主要是構(gòu)造了一個event對象,事件name根據(jù)InvolvedObject中的name加上時間戳生成:

注意看:對于一些非namespace資源產(chǎn)生的event,event的namespace是default

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := ref.Namespace
if namespace == '' {
namespace = metav1.NamespaceDefault
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf('%v.%x', ref.Name, t.UnixNano()),
Namespace: namespace,
Annotations: annotations,
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: t,
LastTimestamp: t,
Count: 1,
Type: eventtype,
}
}

進(jìn)一步跟蹤Action方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23

// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}

將event寫入到了一個channel里面。
注意:
這個Action方式是apimachinery包中的方法,因為實現(xiàn)的sturt recorderImpl
將 *watch.Broadcaster 作為一個匿名struct,并且在 NewRecorder 進(jìn)行 Broadcaster 賦值,這個Broadcaster其實就是 eventBroadcasterImpl 中的Broadcaster。

到此,基本清楚了event最終被寫入到了 Broadcaster 中的 incoming channel中,下面看下是怎么進(jìn)行消費的。

消費事件

在 makeEventRecorder 調(diào)用的 StartLogging 和 StartRecordingToSink 其實就是完成了對事件的消費。

  • StartLogging直接將event輸出到日志
  • StartRecordingToSink將事件寫入到apiserver

兩個方法內(nèi)部都調(diào)用了 StartEventWatcher 方法,并且傳入一個 eventHandler 方法對event進(jìn)行處理

func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := e.Watch()
gofunc() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)
}
}()
return watcher
}

其中 watcher.ResultChan 方法就拿到了事件,這里是在一個goroutine中通過func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event) 方法調(diào)用將event又寫入了broadcasterWatcher.result

主要看下 StartRecordingToSink 提供的的eventHandler, recordToSink 方法:

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf('Unable to write event '%#v' (retry limit exceeded!)', event)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
// 第一次重試增加隨機(jī)性,防止 apiserver 重啟的時候所有的事件都在同一時間發(fā)送事件
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}

其中event被經(jīng)過了一個 eventCorrelator.EventCorrelate(event) 方法做預(yù)處理,主要是聚合相同的事件(避免產(chǎn)生的事件過多,增加 etcd 和 apiserver 的壓力,也會導(dǎo)致查看 pod 事件很不清晰)

下面一個for循環(huán)就是在進(jìn)行重試,最大重試次數(shù)是12次,調(diào)用 recordEvent  方法才真正將event寫入到了apiserver。

事件處理

我們來看下EventCorrelate方法:

// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
if newEvent == nil {
returnnil, fmt.Errorf('event is nil')
}
aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
if c.filterFunc(observedEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

分別調(diào)用了 aggregator.EventAggregate ,logger.eventObserve , filterFunc 三個方法,分別作用是:

  1. aggregator.EventAggregate:聚合event,如果在最近 10 分鐘出現(xiàn)過 10 個相似的事件(除了 message 和時間戳之外其他關(guān)鍵字段都相同的事件),aggregator 會把它們的 message 設(shè)置為 (combined from similar events)+event.Message
  2. logger.eventObserve:它會把相同的事件以及包含 aggregator 被聚合了的相似的事件,通過增加 Count 字段來記錄事件發(fā)生了多少次。
  3. filterFunc: 這里實現(xiàn)了一個基于令牌桶的限流算法,如果超過設(shè)定的速率則丟棄,保證了apiserver的安全。

我們主要來看下aggregator.EventAggregate方法:

func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
now := metav1.NewTime(e.clock.Now())
var record aggregateRecord
// eventKey is the full cache key for this event
//eventKey 是將除了時間戳外所有字段結(jié)合在一起
eventKey := getEventKey(newEvent)
// aggregateKey is for the aggregate event, if one is needed.
//aggregateKey 是除了message和時間戳外的字段結(jié)合在一起,localKey 是message
aggregateKey, localKey := e.keyFunc(newEvent)

// Do we have a record of similar events in our cache?
e.Lock()
defer e.Unlock()
//從cache中根據(jù)aggregateKey查詢是否存在,如果是相同或者相類似的事件會被放入cache中
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}

//判斷上次事件產(chǎn)生的時間是否超過10分鐘,如何操作則重新生成一個localKeys集合(集合中存放message)
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}

// Write the new event into the aggregation record and put it on the cache
//將locakKey也就是message放入集合中,如果message相同就是覆蓋了
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)

// If we are not yet over the threshold for unique events, don't correlate them
//判斷l(xiāng)ocalKeys集合中存放的類似事件是否超過10個,
ifuint(record.localKeys.Len()) < e.maxEvents {
return newEvent, eventKey
}

// do not grow our local key set any larger than max
record.localKeys.PopAny()

// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
eventCopy := &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf('%v.%x', newEvent.InvolvedObject.Name, now.UnixNano()),
Namespace: newEvent.Namespace,
},
Count: 1,
FirstTimestamp: now,
InvolvedObject: newEvent.InvolvedObject,
LastTimestamp: now,
//這里會對message加個前綴:(combined from similar events):
Message: e.messageFunc(newEvent),
Type: newEvent.Type,
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, aggregateKey
}

aggregator.EventAggregate方法中其實就是判斷了通過cache和localKeys判斷事件是否相似,如果最近 10 分鐘出現(xiàn)過 10 個相似的事件就合并并加上前綴,后續(xù)通過logger.eventObserve方法進(jìn)行count累加,如果message也相同,肯定就是直接count++。

總結(jié)

好了,event處理的整個流程基本就是這樣,我們可以概括一下,可以結(jié)合文中的圖對比一起看下:

  1. 創(chuàng)建 EventRecorder 對象,通過其提供的 Event 等方法,創(chuàng)建好event對象
  2. 將創(chuàng)建出來的對象發(fā)送給 EventBroadcaster 中的channel中
  3. EventBroadcaster 通過后臺運(yùn)行的goroutine,從管道中取出事件,并廣播給提前注冊好的handler處理
  4. 當(dāng)輸出log的handler收到事件就直接打印事件
  5. 當(dāng) EventSink handler收到處理事件就通過預(yù)處理之后將事件發(fā)送給apiserver
  6. 其中預(yù)處理包含三個動作,1、限流 2、聚合 3、計數(shù)
  7. apiserver收到事件處理之后就存儲在etcd中

回顧event的整個流程,可以看到event并不是保證100%事件寫入(從預(yù)處理的過程來看),這樣做是為了后端服務(wù)etcd的可用性,因為event事件在整個集群中產(chǎn)生是非常頻繁的,尤其在服務(wù)不穩(wěn)定的時候,而相比Deployment,Pod等其他資源,又沒那么的重要。所以這里做了個取舍。

參考文檔:

  • https:///2017/06/22/kubelet-source-code-analysis-part4-event/
  • https://github.com/kubernetes/kubernetes/blob/v1.17.3/staging/src/k8s.io/client-go/tools/record

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多

    日韩精品一区二区毛片| 国产精品久久香蕉国产线| 国产精品日韩欧美第一页| 手机在线观看亚洲中文字幕| 日韩中文字幕免费在线视频| 欧美国产极品一区二区| 亚洲一区二区三区熟女少妇| 亚洲一区在线观看蜜桃| 亚洲欧洲一区二区综合精品| 五月激情五月天综合网| 久久亚洲成熟女人毛片| 欧美亚洲综合另类色妞| 国产在线观看不卡一区二区| 国产日韩欧美国产欧美日韩 | 亚洲熟女乱色一区二区三区| 国产精品偷拍一区二区| 亚洲欧美天堂精品在线| 国产又粗又深又猛又爽又黄| 色哟哟国产精品免费视频| 亚洲国产成人av毛片国产| 国产一区二区三区不卡| 久久精品国产亚洲熟女| 国产中文字幕一区二区| 欧美不卡一区二区在线视频| 最新午夜福利视频偷拍| 热久久这里只有精品视频| 久久热在线免费视频精品| 国产一区二区三区av在线| 中文字幕精品一区二区年下载| 伊人久久五月天综合网| 亚洲中文字幕熟女丝袜久久| 天堂热东京热男人天堂| 亚洲欧洲在线一区二区三区| 青青操成人免费在线视频| 台湾综合熟女一区二区| 日韩夫妻午夜性生活视频| 日本在线 一区 二区| 日本亚洲精品在线观看| 激情综合五月开心久久| 日本少妇三级三级三级| 91精品欧美综合在ⅹ|