Make each new instance of kubelet generate a new event channel (instead of reusing existing).

This commit is contained in:
saadali
2015-03-16 21:03:07 -07:00
parent 10b4fe6f30
commit e0f71cb21f
13 changed files with 172 additions and 151 deletions

View File

@@ -35,6 +35,8 @@ const maxTriesPerEvent = 12
var sleepDuration = 10 * time.Second
const maxQueuedEvents = 1000
// EventSink knows how to store events (client.Client implements it.)
// EventSink must respect the namespace that will be embedded in 'event'.
// It is assumed that EventSink will return the same sorts of errors as
@@ -44,50 +46,94 @@ type EventSink interface {
Update(event *api.Event) (*api.Event, error)
}
var emptySource = api.EventSource{}
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it will
// be used to automate handling of events, so imagine people writing switch statements to
// handle them. You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, reason, message string)
// StartRecording starts sending events to a sink. Call once while initializing
// your binary. Subsequent calls will be ignored. The return value can be ignored
// or used to stop recording, if desired.
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, reason, messageFmt string, args ...interface{})
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartEventWatcher starts sending events recieved from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired.
StartEventWatcher(eventHandler func(*api.Event)) watch.Interface
// StartRecordingToSink starts sending events recieved from this EventBroadcaster to the given
// sink. The return value can be ignored or used to stop recording, if desired.
StartRecordingToSink(sink EventSink) watch.Interface
// StartLogging starts sending events recieved from this EventBroadcaster to the given logging
// function. The return value can be ignored or used to stop recording, if desired.
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(source api.EventSource) EventRecorder
}
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)}
}
type eventBroadcasterImpl struct {
*watch.Broadcaster
}
// StartRecordingToSink starts sending events recieved from the specified eventBroadcaster to the given sink.
// The return value can be ignored or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval
func StartRecording(sink EventSink) watch.Interface {
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
return eventBroadcaster.StartEventWatcher(
func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
previousEvent := getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
event.Count = previousEvent.Count + 1
event.FirstTimestamp = previousEvent.FirstTimestamp
event.Name = previousEvent.Name
event.ResourceVersion = previousEvent.ResourceVersion
}
previousEvent := getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
event.Count = previousEvent.Count + 1
event.FirstTimestamp = previousEvent.FirstTimestamp
event.Name = previousEvent.Name
event.ResourceVersion = previousEvent.ResourceVersion
}
tries := 0
for {
if recordEvent(sink, event, updateExistingEvent) {
break
tries := 0
for {
if recordEvent(sink, event, updateExistingEvent) {
break
}
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
})
})
}
// recordEvent attempts to write event to a sink. It returns true if the event
@@ -131,22 +177,23 @@ func recordEvent(sink EventSink, event *api.Event, updateExistingEvent bool) boo
return false
}
// StartLogging just logs local events, using the given logging function. The
// return value can be ignored or used to stop logging, if desired.
func StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return GetEvents(func(e *api.Event) {
logf("Event(%#v): reason: '%v' %v", e.InvolvedObject, e.Reason, e.Message)
})
// StartLogging starts sending events recieved from this EventBroadcaster to the given logging function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *api.Event) {
logf("Event(%#v): reason: '%v' %v", e.InvolvedObject, e.Reason, e.Message)
})
}
// GetEvents lets you see *local* events. Convenience function for testing. The
// return value can be ignored or used to stop logging, if desired.
func GetEvents(f func(*api.Event)) watch.Interface {
w := events.Watch()
// StartEventWatcher starts sending events recieved from this EventBroadcaster to the given event handler function.
// The return value can be ignored or used to stop recording, if desired.
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*api.Event)) watch.Interface {
watcher := eventBroadcaster.Watch()
go func() {
defer util.HandleCrash()
for {
watchEvent, open := <-w.ResultChan()
watchEvent, open := <-watcher.ResultChan()
if !open {
return
}
@@ -156,58 +203,37 @@ func GetEvents(f func(*api.Event)) watch.Interface {
// ever happen.
continue
}
f(event)
eventHandler(event)
}
}()
return w
return watcher
}
const maxQueuedEvents = 1000
var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
// EventRecorder knows how to record events for an EventSource.
type EventRecorder interface {
// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it will
// be used to automate handling of events, so imagine people writing switch statements to
// handle them. You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(object runtime.Object, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
Eventf(object runtime.Object, reason, messageFmt string, args ...interface{})
}
// FromSource returns an EventRecorder that records events with the
// given event source.
func FromSource(source api.EventSource) EventRecorder {
return &recorderImpl{source}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(source api.EventSource) EventRecorder {
return &recorderImpl{source, eventBroadcaster.Broadcaster}
}
type recorderImpl struct {
source api.EventSource
*watch.Broadcaster
}
func (i *recorderImpl) Event(object runtime.Object, reason, message string) {
func (recorder *recorderImpl) Event(object runtime.Object, reason, message string) {
ref, err := api.GetReference(object)
if err != nil {
glog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v'", object, err, reason, message)
return
}
e := makeEvent(ref, reason, message)
e.Source = i.source
event := makeEvent(ref, reason, message)
event.Source = recorder.source
events.Action(watch.Added, e)
recorder.Action(watch.Added, event)
}
func (i *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
i.Event(object, reason, fmt.Sprintf(messageFmt, args...))
func (recorder *recorderImpl) Eventf(object runtime.Object, reason, messageFmt string, args ...interface{}) {
recorder.Event(object, reason, fmt.Sprintf(messageFmt, args...))
}
func makeEvent(ref *api.ObjectReference, reason, message string) *api.Event {