Merge pull request #111545 from jlsong01/rewrite_signature_of_StartEventWatcher

rewrite signature of function StartEventWatcher
This commit is contained in:
Kubernetes Prow Robot 2022-11-07 08:06:18 -08:00 committed by GitHub
commit f33209a6ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 16 deletions

View File

@ -411,7 +411,7 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
return true, binding, nil return true, binding, nil
}) })
controllers := make(map[string]string) controllers := make(map[string]string)
stopFn := broadcaster.StartEventWatcher(func(obj runtime.Object) { stopFn, err := broadcaster.StartEventWatcher(func(obj runtime.Object) {
e, ok := obj.(*eventsv1.Event) e, ok := obj.(*eventsv1.Event)
if !ok || e.Reason != "Scheduled" { if !ok || e.Reason != "Scheduled" {
return return
@ -419,6 +419,9 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
controllers[e.Regarding.Name] = e.ReportingController controllers[e.Regarding.Name] = e.ReportingController
wg.Done() wg.Done()
}) })
if err != nil {
t.Fatal(err)
}
defer stopFn() defer stopFn()
// Run scheduler. // Run scheduler.
@ -605,13 +608,16 @@ func TestSchedulerScheduleOne(t *testing.T) {
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg) fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
} }
called := make(chan struct{}) called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*eventsv1.Event) e, _ := obj.(*eventsv1.Event)
if e.Reason != item.eventReason { if e.Reason != item.eventReason {
t.Errorf("got event %v, want %v", e.Reason, item.eventReason) t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
} }
close(called) close(called)
}) })
if err != nil {
t.Fatal(err)
}
sched.scheduleOne(ctx) sched.scheduleOne(ctx)
<-called <-called
if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
@ -948,13 +954,16 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig) fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig)
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, fakeVolumeBinder, eventBroadcaster) s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, fakeVolumeBinder, eventBroadcaster)
eventChan := make(chan struct{}) eventChan := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*eventsv1.Event) e, _ := obj.(*eventsv1.Event)
if e, a := item.eventReason, e.Reason; e != a { if e, a := item.eventReason, e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }
close(eventChan) close(eventChan)
}) })
if err != nil {
t.Fatal(err)
}
s.scheduleOne(ctx) s.scheduleOne(ctx)
// Wait for pod to succeed or fail scheduling // Wait for pod to succeed or fail scheduling
select { select {

View File

@ -292,8 +292,9 @@ func getKey(event *eventsv1.Event) eventKey {
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function. // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired. // The return value can be ignored or used to stop recording, if desired.
// TODO: this function should also return an error.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() { func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
return e.StartEventWatcher( stopWatcher, err := e.StartEventWatcher(
func(obj runtime.Object) { func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event) event, ok := obj.(*eventsv1.Event)
if !ok { if !ok {
@ -302,19 +303,20 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func
} }
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note) klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
}) })
if err != nil {
klog.Errorf("failed to start event watcher: '%v'", err)
return func() {}
}
return stopWatcher
} }
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording // The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() { func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
watcher, err := e.Watch() watcher, err := e.Watch()
if err != nil { if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
// TODO: Rewrite the function signature to return an error, for return nil, err
// now just return a no-op function
return func() {
klog.Error("The event watcher failed to start")
}
} }
go func() { go func() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -326,10 +328,10 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
eventHandler(watchEvent.Object) eventHandler(watchEvent.Object)
} }
}() }()
return watcher.Stop return watcher.Stop, nil
} }
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) { func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
eventHandler := func(obj runtime.Object) { eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event) event, ok := obj.(*eventsv1.Event)
if !ok { if !ok {
@ -338,18 +340,26 @@ func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
} }
e.recordToSink(event, clock.RealClock{}) e.recordToSink(event, clock.RealClock{})
} }
stopWatcher := e.StartEventWatcher(eventHandler) stopWatcher, err := e.StartEventWatcher(eventHandler)
if err != nil {
return err
}
go func() { go func() {
<-stopCh <-stopCh
stopWatcher() stopWatcher()
}() }()
return nil
} }
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh) go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh) go wait.Until(e.finishSeries, finishTime, stopCh)
e.startRecordingEvents(stopCh) err := e.startRecordingEvents(stopCh)
if err != nil {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
}
} }
type eventBroadcasterAdapterImpl struct { type eventBroadcasterAdapterImpl struct {

View File

@ -161,7 +161,10 @@ func TestEventSeriesf(t *testing.T) {
// Don't call StartRecordingToSink, as we don't need neither refreshing event // Don't call StartRecordingToSink, as we don't need neither refreshing event
// series nor finishing them in this tests and additional events updated would // series nor finishing them in this tests and additional events updated would
// race with our expected ones. // race with our expected ones.
broadcaster.startRecordingEvents(stopCh) err = broadcaster.startRecordingEvents(stopCh)
if err != nil {
t.Fatal(err)
}
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
// read from the chan as this was needed only to populate the cache // read from the chan as this was needed only to populate the cache
<-createEvent <-createEvent

View File

@ -55,7 +55,7 @@ type EventBroadcaster interface {
// of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests). // of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests).
// NOTE: events received on your eventHandler should be copied before being used. // NOTE: events received on your eventHandler should be copied before being used.
// TODO: figure out if this can be removed. // TODO: figure out if this can be removed.
StartEventWatcher(eventHandler func(event runtime.Object)) func() StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error)
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
// logging function. The return value can be ignored or used to stop recording, if desired. // logging function. The return value can be ignored or used to stop recording, if desired.