其實event也是一個資源對象,并且通過apiserver將event存儲在etcd中,所以我們也可以通過 kubectl get event 命令查看對應(yīng)的event對象。
以下是一個event的yaml文件:
apiVersion: v1 count: 1 eventTime: null firstTimestamp: '2020-03-02T13:08:22Z' involvedObject: apiVersion: v1 kind: Pod name: example-foo-d75d8587c-xsf64 namespace: default resourceVersion: '429837' uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4 kind: Event lastTimestamp: '2020-03-02T13:08:22Z' message: Pod sandbox changed, it will be killed and re-created. metadata: creationTimestamp: '2020-03-02T13:08:30Z' name: example-foo-d75d8587c-xsf64.15f87ea1df862b64 namespace: default resourceVersion: '479466' selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64 uid: 9fe6f72a-341d-4c49-960b-e185982d331a reason: SandboxChanged reportingComponent: '' reportingInstance: '' source: component: kubelet host: minikube type: Normal
主要字段說明:
involvedObject:觸發(fā)event的資源類型
lastTimestamp:最后一次觸發(fā)的時間
message:事件說明
metadata :event的元信息,name,namespace等
reason:event的原因
source:上報事件的來源,比如kubelet中的某個節(jié)點
type:事件類型,Normal或Warning
event字段定義可以看這里:types.go#L5078
接下來我們來看看,整個event是如何下入的。
寫入事件
1、這里以kubelet為例,看看是如何進(jìn)行事件寫入的
2、代碼是在Kubernetes 1.17.3基礎(chǔ)上進(jìn)行分析
先以一幅圖來看下整個的處理流程
創(chuàng)建操作事件的客戶端: kubelet/app/server.go#L461
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise. funcmakeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { if kubeDeps.Recorder != nil { return } //事件廣播 eventBroadcaster := record.NewBroadcaster() //創(chuàng)建EventRecorder kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) //發(fā)送event至log輸出 eventBroadcaster.StartLogging(klog.V(3).Infof) if kubeDeps.EventClient != nil { klog.V(4).Infof('Sending events to api server.') //發(fā)送event至apiserver eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events('')}) } else { klog.Warning('No api server defined - no events will be sent to API server.') } }
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. type EventBroadcaster interface { // StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface StartRecordingToSink(sink EventSink)watch.Interface StartLogging(logf func(format string, args ...interface{})) watch.Interface NewRecorder(scheme *runtime.Scheme, source v1.EventSource)EventRecorder
// EventRecorder knows how to record events on behalf of an EventSource. type EventRecorder interface { // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also // pass a reference to the object directly. // 'type' of this event, and can be one of Normal, Warning. New types could be added in future // 'reason' is the reason this event is generated. 'reason' should be short and unique; it // should be in UpperCamelCase format (starting with a capital letter). 'reason' will be used // to automate handling of events, so imagine people writing switch statements to handle them. // You want to make that easy. // 'message' is intended to be human readable. // // The resulting event will be created in the same namespace as the reference object. Event(object runtime.Object, eventtype, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field. Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field. PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
// AnnotatedEventf is just like eventf, but with annotations attached AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) }
// Action distributes the given event among all watchers. func(m *Broadcaster)Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} }
func(e *eventBroadcasterImpl)StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { watcher := e.Watch() gofunc() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { event, ok := watchEvent.Object.(*v1.Event) if !ok { // This is all local, so there's no reason this should // ever happen. continue } eventHandler(event) } }() return watcher }
funcrecordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy result, err := eventCorrelator.EventCorrelate(event) if err != nil { utilruntime.HandleError(err) } if result.Skip { return } tries := 0 for { if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { break } tries++ if tries >= maxTriesPerEvent { klog.Errorf('Unable to write event '%#v' (retry limit exceeded!)', event) break } // Randomize the first sleep so that various clients won't all be // synced up if the master goes down. // 第一次重試增加隨機(jī)性,防止 apiserver 重啟的時候所有的事件都在同一時間發(fā)送事件 if tries == 1 { time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) } else { time.Sleep(sleepDuration) } } }
其中event被經(jīng)過了一個 eventCorrelator.EventCorrelate(event) 方法做預(yù)處理,主要是聚合相同的事件(避免產(chǎn)生的事件過多,增加 etcd 和 apiserver 的壓力,也會導(dǎo)致查看 pod 事件很不清晰)
func(e *EventAggregator)EventAggregate(newEvent *v1.Event)(*v1.Event, string) { now := metav1.NewTime(e.clock.Now()) var record aggregateRecord // eventKey is the full cache key for this event //eventKey 是將除了時間戳外所有字段結(jié)合在一起 eventKey := getEventKey(newEvent) // aggregateKey is for the aggregate event, if one is needed. //aggregateKey 是除了message和時間戳外的字段結(jié)合在一起,localKey 是message aggregateKey, localKey := e.keyFunc(newEvent)
// Do we have a record of similar events in our cache? e.Lock() defer e.Unlock() //從cache中根據(jù)aggregateKey查詢是否存在,如果是相同或者相類似的事件會被放入cache中 value, found := e.cache.Get(aggregateKey) if found { record = value.(aggregateRecord) }
//判斷上次事件產(chǎn)生的時間是否超過10分鐘,如何操作則重新生成一個localKeys集合(集合中存放message) maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second interval := now.Time.Sub(record.lastTimestamp.Time) if interval > maxInterval { record = aggregateRecord{localKeys: sets.NewString()} }
// Write the new event into the aggregation record and put it on the cache //將locakKey也就是message放入集合中,如果message相同就是覆蓋了 record.localKeys.Insert(localKey) record.lastTimestamp = now e.cache.Add(aggregateKey, record)
// If we are not yet over the threshold for unique events, don't correlate them //判斷l(xiāng)ocalKeys集合中存放的類似事件是否超過10個, ifuint(record.localKeys.Len()) < e.maxEvents { return newEvent, eventKey }
// do not grow our local key set any larger than max record.localKeys.PopAny()
// create a new aggregate event, and return the aggregateKey as the cache key // (so that it can be overwritten.) eventCopy := &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf('%v.%x', newEvent.InvolvedObject.Name, now.UnixNano()), Namespace: newEvent.Namespace, }, Count: 1, FirstTimestamp: now, InvolvedObject: newEvent.InvolvedObject, LastTimestamp: now, //這里會對message加個前綴:(combined from similar events): Message: e.messageFunc(newEvent), Type: newEvent.Type, Reason: newEvent.Reason, Source: newEvent.Source, } return eventCopy, aggregateKey }