mirror of
https://github.com/kubernetes/client-go.git
synced 2025-09-03 08:04:45 +00:00
remove the top-level folders for versions
remove scripts
This commit is contained in:
316
tools/record/event.go
Normal file
316
tools/record/event.go
Normal file
@@ -0,0 +1,316 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package record
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/1.5/pkg/api/errors"
|
||||
"k8s.io/client-go/1.5/pkg/api/unversioned"
|
||||
"k8s.io/client-go/1.5/pkg/api/v1"
|
||||
"k8s.io/client-go/1.5/pkg/runtime"
|
||||
"k8s.io/client-go/1.5/pkg/util/clock"
|
||||
utilruntime "k8s.io/client-go/1.5/pkg/util/runtime"
|
||||
"k8s.io/client-go/1.5/pkg/watch"
|
||||
"k8s.io/client-go/1.5/rest"
|
||||
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const maxTriesPerEvent = 12
|
||||
|
||||
var defaultSleepDuration = 10 * time.Second
|
||||
|
||||
const maxQueuedEvents = 1000
|
||||
|
||||
// EventSink knows how to store events (client.Client implements it.)
|
||||
// EventSink must respect the namespace that will be embedded in 'event'.
|
||||
// It is assumed that EventSink will return the same sorts of errors as
|
||||
// pkg/client's REST client.
|
||||
type EventSink interface {
|
||||
Create(event *v1.Event) (*v1.Event, error)
|
||||
Update(event *v1.Event) (*v1.Event, error)
|
||||
Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
|
||||
}
|
||||
|
||||
// EventRecorder knows how to record events on behalf of an EventSource.
|
||||
type EventRecorder interface {
|
||||
// Event constructs an event from the given information and puts it in the queue for sending.
|
||||
// 'object' is the object this event is about. Event will make a reference-- or you may also
|
||||
// pass a reference to the object directly.
|
||||
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
|
||||
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
|
||||
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
|
||||
// to automate handling of events, so imagine people writing switch statements to handle them.
|
||||
// You want to make that easy.
|
||||
// 'message' is intended to be human readable.
|
||||
//
|
||||
// The resulting event will be created in the same namespace as the reference object.
|
||||
Event(object runtime.Object, eventtype, reason, message string)
|
||||
|
||||
// Eventf is just like Event, but with Sprintf for the message field.
|
||||
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
|
||||
|
||||
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
|
||||
PastEventf(object runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string, args ...interface{})
|
||||
}
|
||||
|
||||
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
|
||||
type EventBroadcaster interface {
|
||||
// StartEventWatcher starts sending events received from this EventBroadcaster to the given
|
||||
// event handler function. The return value can be ignored or used to stop recording, if
|
||||
// desired.
|
||||
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
|
||||
|
||||
// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
|
||||
// sink. The return value can be ignored or used to stop recording, if desired.
|
||||
StartRecordingToSink(sink EventSink) watch.Interface
|
||||
|
||||
// StartLogging starts sending events received from this EventBroadcaster to the given logging
|
||||
// function. The return value can be ignored or used to stop recording, if desired.
|
||||
StartLogging(logf func(format string, args ...interface{})) watch.Interface
|
||||
|
||||
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
|
||||
// with the event source set to the given event source.
|
||||
NewRecorder(source v1.EventSource) EventRecorder
|
||||
}
|
||||
|
||||
// Creates a new event broadcaster.
|
||||
func NewBroadcaster() EventBroadcaster {
|
||||
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
|
||||
}
|
||||
|
||||
func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
|
||||
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration}
|
||||
}
|
||||
|
||||
type eventBroadcasterImpl struct {
|
||||
*watch.Broadcaster
|
||||
sleepDuration time.Duration
|
||||
}
|
||||
|
||||
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
|
||||
// The return value can be ignored or used to stop recording, if desired.
|
||||
// TODO: make me an object with parameterizable queue length and retry interval
|
||||
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
|
||||
// The default math/rand package functions aren't thread safe, so create a
|
||||
// new Rand object for each StartRecording call.
|
||||
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
eventCorrelator := NewEventCorrelator(clock.RealClock{})
|
||||
return eventBroadcaster.StartEventWatcher(
|
||||
func(event *v1.Event) {
|
||||
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
|
||||
})
|
||||
}
|
||||
|
||||
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
|
||||
// Make a copy before modification, because there could be multiple listeners.
|
||||
// Events are safe to copy like this.
|
||||
eventCopy := *event
|
||||
event = &eventCopy
|
||||
result, err := eventCorrelator.EventCorrelate(event)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
if result.Skip {
|
||||
return
|
||||
}
|
||||
tries := 0
|
||||
for {
|
||||
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
|
||||
break
|
||||
}
|
||||
tries++
|
||||
if tries >= maxTriesPerEvent {
|
||||
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
|
||||
break
|
||||
}
|
||||
// Randomize the first sleep so that various clients won't all be
|
||||
// synced up if the master goes down.
|
||||
if tries == 1 {
|
||||
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
|
||||
} else {
|
||||
time.Sleep(sleepDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isKeyNotFoundError(err error) bool {
|
||||
statusErr, _ := err.(*errors.StatusError)
|
||||
|
||||
if statusErr != nil && statusErr.Status().Code == http.StatusNotFound {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// recordEvent attempts to write event to a sink. It returns true if the event
|
||||
// was successfully recorded or discarded, false if it should be retried.
|
||||
// If updateExistingEvent is false, it creates a new event, otherwise it updates
|
||||
// existing event.
|
||||
func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
|
||||
var newEvent *v1.Event
|
||||
var err error
|
||||
if updateExistingEvent {
|
||||
newEvent, err = sink.Patch(event, patch)
|
||||
}
|
||||
// Update can fail because the event may have been removed and it no longer exists.
|
||||
if !updateExistingEvent || (updateExistingEvent && isKeyNotFoundError(err)) {
|
||||
// Making sure that ResourceVersion is empty on creation
|
||||
event.ResourceVersion = ""
|
||||
newEvent, err = sink.Create(event)
|
||||
}
|
||||
if err == nil {
|
||||
// we need to update our event correlator with the server returned state to handle name/resourceversion
|
||||
eventCorrelator.UpdateState(newEvent)
|
||||
return true
|
||||
}
|
||||
|
||||
// If we can't contact the server, then hold everything while we keep trying.
|
||||
// Otherwise, something about the event is malformed and we should abandon it.
|
||||
switch err.(type) {
|
||||
case *rest.RequestConstructionError:
|
||||
// We will construct the request the same next time, so don't keep trying.
|
||||
glog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
|
||||
return true
|
||||
case *errors.StatusError:
|
||||
if errors.IsAlreadyExists(err) {
|
||||
glog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
} else {
|
||||
glog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
|
||||
}
|
||||
return true
|
||||
case *errors.UnexpectedObjectError:
|
||||
// We don't expect this; it implies the server's response didn't match a
|
||||
// known pattern. Go ahead and retry.
|
||||
default:
|
||||
// This case includes actual http transport errors. Go ahead and retry.
|
||||
}
|
||||
glog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// StartLogging starts sending events received from this EventBroadcaster to the given logging function.
|
||||
// The return value can be ignored or used to stop recording, if desired.
|
||||
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
|
||||
return eventBroadcaster.StartEventWatcher(
|
||||
func(e *v1.Event) {
|
||||
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
|
||||
})
|
||||
}
|
||||
|
||||
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
|
||||
// The return value can be ignored or used to stop recording, if desired.
|
||||
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
|
||||
watcher := eventBroadcaster.Watch()
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
for {
|
||||
watchEvent, open := <-watcher.ResultChan()
|
||||
if !open {
|
||||
return
|
||||
}
|
||||
event, ok := watchEvent.Object.(*v1.Event)
|
||||
if !ok {
|
||||
// This is all local, so there's no reason this should
|
||||
// ever happen.
|
||||
continue
|
||||
}
|
||||
eventHandler(event)
|
||||
}
|
||||
}()
|
||||
return watcher
|
||||
}
|
||||
|
||||
// NewRecorder returns an EventRecorder that records events with the given event source.
|
||||
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source v1.EventSource) EventRecorder {
|
||||
return &recorderImpl{source, eventBroadcaster.Broadcaster, clock.RealClock{}}
|
||||
}
|
||||
|
||||
type recorderImpl struct {
|
||||
source v1.EventSource
|
||||
*watch.Broadcaster
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) generateEvent(object runtime.Object, timestamp unversioned.Time, eventtype, reason, message string) {
|
||||
ref, err := v1.GetReference(object)
|
||||
if err != nil {
|
||||
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
|
||||
return
|
||||
}
|
||||
|
||||
if !validateEventType(eventtype) {
|
||||
glog.Errorf("Unsupported event type: '%v'", eventtype)
|
||||
return
|
||||
}
|
||||
|
||||
event := recorder.makeEvent(ref, eventtype, reason, message)
|
||||
event.Source = recorder.source
|
||||
|
||||
go func() {
|
||||
// NOTE: events should be a non-blocking operation
|
||||
defer utilruntime.HandleCrash()
|
||||
recorder.Action(watch.Added, event)
|
||||
}()
|
||||
}
|
||||
|
||||
func validateEventType(eventtype string) bool {
|
||||
switch eventtype {
|
||||
case v1.EventTypeNormal, v1.EventTypeWarning:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
|
||||
recorder.generateEvent(object, unversioned.Now(), eventtype, reason, message)
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
|
||||
recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) PastEventf(object runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string, args ...interface{}) {
|
||||
recorder.generateEvent(object, timestamp, eventtype, reason, fmt.Sprintf(messageFmt, args...))
|
||||
}
|
||||
|
||||
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *v1.Event {
|
||||
t := unversioned.Time{Time: recorder.clock.Now()}
|
||||
namespace := ref.Namespace
|
||||
if namespace == "" {
|
||||
namespace = v1.NamespaceDefault
|
||||
}
|
||||
return &v1.Event{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
|
||||
Namespace: namespace,
|
||||
},
|
||||
InvolvedObject: *ref,
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
FirstTimestamp: t,
|
||||
LastTimestamp: t,
|
||||
Count: 1,
|
||||
Type: eventtype,
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user