mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-03 23:40:03 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			361 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			361 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2015 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package record
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/groupcache/lru"
 | 
						|
 | 
						|
	"k8s.io/kubernetes/pkg/api"
 | 
						|
	"k8s.io/kubernetes/pkg/api/unversioned"
 | 
						|
	"k8s.io/kubernetes/pkg/util/clock"
 | 
						|
	"k8s.io/kubernetes/pkg/util/sets"
 | 
						|
	"k8s.io/kubernetes/pkg/util/strategicpatch"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	maxLruCacheEntries = 4096
 | 
						|
 | 
						|
	// if we see the same event that varies only by message
 | 
						|
	// more than 10 times in a 10 minute period, aggregate the event
 | 
						|
	defaultAggregateMaxEvents         = 10
 | 
						|
	defaultAggregateIntervalInSeconds = 600
 | 
						|
)
 | 
						|
 | 
						|
// getEventKey builds unique event key based on source, involvedObject, reason, message
 | 
						|
func getEventKey(event *api.Event) string {
 | 
						|
	return strings.Join([]string{
 | 
						|
		event.Source.Component,
 | 
						|
		event.Source.Host,
 | 
						|
		event.InvolvedObject.Kind,
 | 
						|
		event.InvolvedObject.Namespace,
 | 
						|
		event.InvolvedObject.Name,
 | 
						|
		string(event.InvolvedObject.UID),
 | 
						|
		event.InvolvedObject.APIVersion,
 | 
						|
		event.Type,
 | 
						|
		event.Reason,
 | 
						|
		event.Message,
 | 
						|
	},
 | 
						|
		"")
 | 
						|
}
 | 
						|
 | 
						|
// EventFilterFunc is a function that returns true if the event should be skipped
 | 
						|
type EventFilterFunc func(event *api.Event) bool
 | 
						|
 | 
						|
// DefaultEventFilterFunc returns false for all incoming events
 | 
						|
func DefaultEventFilterFunc(event *api.Event) bool {
 | 
						|
	return false
 | 
						|
}
 | 
						|
 | 
						|
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
 | 
						|
// It returns a tuple of the following:
 | 
						|
// aggregateKey - key the identifies the aggregate group to bucket this event
 | 
						|
// localKey - key that makes this event in the local group
 | 
						|
type EventAggregatorKeyFunc func(event *api.Event) (aggregateKey string, localKey string)
 | 
						|
 | 
						|
// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type and event.Reason
 | 
						|
func EventAggregatorByReasonFunc(event *api.Event) (string, string) {
 | 
						|
	return strings.Join([]string{
 | 
						|
		event.Source.Component,
 | 
						|
		event.Source.Host,
 | 
						|
		event.InvolvedObject.Kind,
 | 
						|
		event.InvolvedObject.Namespace,
 | 
						|
		event.InvolvedObject.Name,
 | 
						|
		string(event.InvolvedObject.UID),
 | 
						|
		event.InvolvedObject.APIVersion,
 | 
						|
		event.Type,
 | 
						|
		event.Reason,
 | 
						|
	},
 | 
						|
		""), event.Message
 | 
						|
}
 | 
						|
 | 
						|
// EventAggregatorMessageFunc is responsible for producing an aggregation message
 | 
						|
type EventAggregatorMessageFunc func(event *api.Event) string
 | 
						|
 | 
						|
// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
 | 
						|
func EventAggregatorByReasonMessageFunc(event *api.Event) string {
 | 
						|
	return "(events with common reason combined)"
 | 
						|
}
 | 
						|
 | 
						|
// EventAggregator identifies similar events and aggregates them into a single event
 | 
						|
type EventAggregator struct {
 | 
						|
	sync.RWMutex
 | 
						|
 | 
						|
	// The cache that manages aggregation state
 | 
						|
	cache *lru.Cache
 | 
						|
 | 
						|
	// The function that groups events for aggregation
 | 
						|
	keyFunc EventAggregatorKeyFunc
 | 
						|
 | 
						|
	// The function that generates a message for an aggregate event
 | 
						|
	messageFunc EventAggregatorMessageFunc
 | 
						|
 | 
						|
	// The maximum number of events in the specified interval before aggregation occurs
 | 
						|
	maxEvents int
 | 
						|
 | 
						|
	// The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
 | 
						|
	maxIntervalInSeconds int
 | 
						|
 | 
						|
	// clock is used to allow for testing over a time interval
 | 
						|
	clock clock.Clock
 | 
						|
}
 | 
						|
 | 
						|
// NewEventAggregator returns a new instance of an EventAggregator
 | 
						|
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
 | 
						|
	maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
 | 
						|
	return &EventAggregator{
 | 
						|
		cache:                lru.New(lruCacheSize),
 | 
						|
		keyFunc:              keyFunc,
 | 
						|
		messageFunc:          messageFunc,
 | 
						|
		maxEvents:            maxEvents,
 | 
						|
		maxIntervalInSeconds: maxIntervalInSeconds,
 | 
						|
		clock:                clock,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// aggregateRecord holds data used to perform aggregation decisions
 | 
						|
type aggregateRecord struct {
 | 
						|
	// we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
 | 
						|
	// if the size of this set exceeds the max, we know we need to aggregate
 | 
						|
	localKeys sets.String
 | 
						|
	// The last time at which the aggregate was recorded
 | 
						|
	lastTimestamp unversioned.Time
 | 
						|
}
 | 
						|
 | 
						|
// EventAggregate identifies similar events and groups into a common event if required
 | 
						|
func (e *EventAggregator) EventAggregate(newEvent *api.Event) (*api.Event, error) {
 | 
						|
	aggregateKey, localKey := e.keyFunc(newEvent)
 | 
						|
	now := unversioned.NewTime(e.clock.Now())
 | 
						|
	record := aggregateRecord{localKeys: sets.NewString(), lastTimestamp: now}
 | 
						|
	e.Lock()
 | 
						|
	defer e.Unlock()
 | 
						|
	value, found := e.cache.Get(aggregateKey)
 | 
						|
	if found {
 | 
						|
		record = value.(aggregateRecord)
 | 
						|
	}
 | 
						|
 | 
						|
	// if the last event was far enough in the past, it is not aggregated, and we must reset state
 | 
						|
	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
 | 
						|
	interval := now.Time.Sub(record.lastTimestamp.Time)
 | 
						|
	if interval > maxInterval {
 | 
						|
		record = aggregateRecord{localKeys: sets.NewString()}
 | 
						|
	}
 | 
						|
	record.localKeys.Insert(localKey)
 | 
						|
	record.lastTimestamp = now
 | 
						|
	e.cache.Add(aggregateKey, record)
 | 
						|
 | 
						|
	if record.localKeys.Len() < e.maxEvents {
 | 
						|
		return newEvent, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// do not grow our local key set any larger than max
 | 
						|
	record.localKeys.PopAny()
 | 
						|
 | 
						|
	// create a new aggregate event
 | 
						|
	eventCopy := &api.Event{
 | 
						|
		ObjectMeta: api.ObjectMeta{
 | 
						|
			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
 | 
						|
			Namespace: newEvent.Namespace,
 | 
						|
		},
 | 
						|
		Count:          1,
 | 
						|
		FirstTimestamp: now,
 | 
						|
		InvolvedObject: newEvent.InvolvedObject,
 | 
						|
		LastTimestamp:  now,
 | 
						|
		Message:        e.messageFunc(newEvent),
 | 
						|
		Type:           newEvent.Type,
 | 
						|
		Reason:         newEvent.Reason,
 | 
						|
		Source:         newEvent.Source,
 | 
						|
	}
 | 
						|
	return eventCopy, nil
 | 
						|
}
 | 
						|
 | 
						|
// eventLog records data about when an event was observed
 | 
						|
type eventLog struct {
 | 
						|
	// The number of times the event has occurred since first occurrence.
 | 
						|
	count int
 | 
						|
 | 
						|
	// The time at which the event was first recorded.
 | 
						|
	firstTimestamp unversioned.Time
 | 
						|
 | 
						|
	// The unique name of the first occurrence of this event
 | 
						|
	name string
 | 
						|
 | 
						|
	// Resource version returned from previous interaction with server
 | 
						|
	resourceVersion string
 | 
						|
}
 | 
						|
 | 
						|
// eventLogger logs occurrences of an event
 | 
						|
type eventLogger struct {
 | 
						|
	sync.RWMutex
 | 
						|
	cache *lru.Cache
 | 
						|
	clock clock.Clock
 | 
						|
}
 | 
						|
 | 
						|
// newEventLogger observes events and counts their frequencies
 | 
						|
func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
 | 
						|
	return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
 | 
						|
}
 | 
						|
 | 
						|
// eventObserve records the event, and determines if its frequency should update
 | 
						|
func (e *eventLogger) eventObserve(newEvent *api.Event) (*api.Event, []byte, error) {
 | 
						|
	var (
 | 
						|
		patch []byte
 | 
						|
		err   error
 | 
						|
	)
 | 
						|
	key := getEventKey(newEvent)
 | 
						|
	eventCopy := *newEvent
 | 
						|
	event := &eventCopy
 | 
						|
 | 
						|
	e.Lock()
 | 
						|
	defer e.Unlock()
 | 
						|
 | 
						|
	lastObservation := e.lastEventObservationFromCache(key)
 | 
						|
 | 
						|
	// we have seen this event before, so we must prepare a patch
 | 
						|
	if lastObservation.count > 0 {
 | 
						|
		// update the event based on the last observation so patch will work as desired
 | 
						|
		event.Name = lastObservation.name
 | 
						|
		event.ResourceVersion = lastObservation.resourceVersion
 | 
						|
		event.FirstTimestamp = lastObservation.firstTimestamp
 | 
						|
		event.Count = int32(lastObservation.count) + 1
 | 
						|
 | 
						|
		eventCopy2 := *event
 | 
						|
		eventCopy2.Count = 0
 | 
						|
		eventCopy2.LastTimestamp = unversioned.NewTime(time.Unix(0, 0))
 | 
						|
 | 
						|
		newData, _ := json.Marshal(event)
 | 
						|
		oldData, _ := json.Marshal(eventCopy2)
 | 
						|
		patch, err = strategicpatch.CreateStrategicMergePatch(oldData, newData, event)
 | 
						|
	}
 | 
						|
 | 
						|
	// record our new observation
 | 
						|
	e.cache.Add(
 | 
						|
		key,
 | 
						|
		eventLog{
 | 
						|
			count:           int(event.Count),
 | 
						|
			firstTimestamp:  event.FirstTimestamp,
 | 
						|
			name:            event.Name,
 | 
						|
			resourceVersion: event.ResourceVersion,
 | 
						|
		},
 | 
						|
	)
 | 
						|
	return event, patch, err
 | 
						|
}
 | 
						|
 | 
						|
// updateState updates its internal tracking information based on latest server state
 | 
						|
func (e *eventLogger) updateState(event *api.Event) {
 | 
						|
	key := getEventKey(event)
 | 
						|
	e.Lock()
 | 
						|
	defer e.Unlock()
 | 
						|
	// record our new observation
 | 
						|
	e.cache.Add(
 | 
						|
		key,
 | 
						|
		eventLog{
 | 
						|
			count:           int(event.Count),
 | 
						|
			firstTimestamp:  event.FirstTimestamp,
 | 
						|
			name:            event.Name,
 | 
						|
			resourceVersion: event.ResourceVersion,
 | 
						|
		},
 | 
						|
	)
 | 
						|
}
 | 
						|
 | 
						|
// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
 | 
						|
func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
 | 
						|
	value, ok := e.cache.Get(key)
 | 
						|
	if ok {
 | 
						|
		observationValue, ok := value.(eventLog)
 | 
						|
		if ok {
 | 
						|
			return observationValue
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return eventLog{}
 | 
						|
}
 | 
						|
 | 
						|
// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system.  It can filter all
 | 
						|
// incoming events to see if the event should be filtered from further processing.  It can aggregate similar events that occur
 | 
						|
// frequently to protect the system from spamming events that are difficult for users to distinguish.  It performs de-duplication
 | 
						|
// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
 | 
						|
type EventCorrelator struct {
 | 
						|
	// the function to filter the event
 | 
						|
	filterFunc EventFilterFunc
 | 
						|
	// the object that performs event aggregation
 | 
						|
	aggregator *EventAggregator
 | 
						|
	// the object that observes events as they come through
 | 
						|
	logger *eventLogger
 | 
						|
}
 | 
						|
 | 
						|
// EventCorrelateResult is the result of a Correlate
 | 
						|
type EventCorrelateResult struct {
 | 
						|
	// the event after correlation
 | 
						|
	Event *api.Event
 | 
						|
	// if provided, perform a strategic patch when updating the record on the server
 | 
						|
	Patch []byte
 | 
						|
	// if true, do no further processing of the event
 | 
						|
	Skip bool
 | 
						|
}
 | 
						|
 | 
						|
// NewEventCorrelator returns an EventCorrelator configured with default values.
 | 
						|
//
 | 
						|
// The EventCorrelator is responsible for event filtering, aggregating, and counting
 | 
						|
// prior to interacting with the API server to record the event.
 | 
						|
//
 | 
						|
// The default behavior is as follows:
 | 
						|
//   * No events are filtered from being recorded
 | 
						|
//   * Aggregation is performed if a similar event is recorded 10 times in a
 | 
						|
//     in a 10 minute rolling interval.  A similar event is an event that varies only by
 | 
						|
//     the Event.Message field.  Rather than recording the precise event, aggregation
 | 
						|
//     will create a new event whose message reports that it has combined events with
 | 
						|
//     the same reason.
 | 
						|
//   * Events are incrementally counted if the exact same event is encountered multiple
 | 
						|
//     times.
 | 
						|
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
 | 
						|
	cacheSize := maxLruCacheEntries
 | 
						|
	return &EventCorrelator{
 | 
						|
		filterFunc: DefaultEventFilterFunc,
 | 
						|
		aggregator: NewEventAggregator(
 | 
						|
			cacheSize,
 | 
						|
			EventAggregatorByReasonFunc,
 | 
						|
			EventAggregatorByReasonMessageFunc,
 | 
						|
			defaultAggregateMaxEvents,
 | 
						|
			defaultAggregateIntervalInSeconds,
 | 
						|
			clock),
 | 
						|
		logger: newEventLogger(cacheSize, clock),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
 | 
						|
func (c *EventCorrelator) EventCorrelate(newEvent *api.Event) (*EventCorrelateResult, error) {
 | 
						|
	if c.filterFunc(newEvent) {
 | 
						|
		return &EventCorrelateResult{Skip: true}, nil
 | 
						|
	}
 | 
						|
	aggregateEvent, err := c.aggregator.EventAggregate(newEvent)
 | 
						|
	if err != nil {
 | 
						|
		return &EventCorrelateResult{}, err
 | 
						|
	}
 | 
						|
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent)
 | 
						|
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
 | 
						|
}
 | 
						|
 | 
						|
// UpdateState based on the latest observed state from server
 | 
						|
func (c *EventCorrelator) UpdateState(event *api.Event) {
 | 
						|
	c.logger.updateState(event)
 | 
						|
}
 |