Merge pull request #111545 from jlsong01/rewrite_signature_of_StartEventWatcher

rewrite signature of function StartEventWatcher

Kubernetes-commit: f33209a6ede53ce6ead8aed6cfc1823df1afedfc
This commit is contained in:
Kubernetes Publisher 2022-11-07 08:06:18 -08:00
commit 7ed3193a72
5 changed files with 30 additions and 17 deletions

4
go.mod
View File

@ -24,7 +24,7 @@ require (
golang.org/x/term v0.1.0 golang.org/x/term v0.1.0
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/protobuf v1.28.1 google.golang.org/protobuf v1.28.1
k8s.io/api v0.0.0-20221104075248-4bad65698270 k8s.io/api v0.0.0-20221104195249-20250459e6c2
k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2
k8s.io/klog/v2 v2.80.1 k8s.io/klog/v2 v2.80.1
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280
@ -59,6 +59,6 @@ require (
) )
replace ( replace (
k8s.io/api => k8s.io/api v0.0.0-20221104075248-4bad65698270 k8s.io/api => k8s.io/api v0.0.0-20221104195249-20250459e6c2
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2
) )

4
go.sum
View File

@ -476,8 +476,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20221104075248-4bad65698270 h1:KC6Bw3yZDigxwsAHhJ0gUZftgbCPq0vWpHWS5JxWA/E= k8s.io/api v0.0.0-20221104195249-20250459e6c2 h1:LLS5URpPMtPRoB9zYHFCYDsxyPMTtDorf8eg10LbZKk=
k8s.io/api v0.0.0-20221104075248-4bad65698270/go.mod h1:O7KYltqKbaDavU20bYboF3FKNWyQen8XmDYgb8+Kat0= k8s.io/api v0.0.0-20221104195249-20250459e6c2/go.mod h1:O7KYltqKbaDavU20bYboF3FKNWyQen8XmDYgb8+Kat0=
k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 h1:0nhI6fiyouN4H8MXOcMcCOybGhw4FgxwQbadTKPIRlA= k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2 h1:0nhI6fiyouN4H8MXOcMcCOybGhw4FgxwQbadTKPIRlA=
k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2/go.mod h1:zSkBXgO5G/dSQOe256tx5Yo2OJytojpY3bsXu/4/ZJE= k8s.io/apimachinery v0.0.0-20221103075033-9e85d3af4ae2/go.mod h1:zSkBXgO5G/dSQOe256tx5Yo2OJytojpY3bsXu/4/ZJE=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=

View File

@ -292,8 +292,9 @@ func getKey(event *eventsv1.Event) eventKey {
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function. // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
// The return value can be ignored or used to stop recording, if desired. // The return value can be ignored or used to stop recording, if desired.
// TODO: this function should also return an error.
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() { func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
return e.StartEventWatcher( stopWatcher, err := e.StartEventWatcher(
func(obj runtime.Object) { func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event) event, ok := obj.(*eventsv1.Event)
if !ok { if !ok {
@ -302,19 +303,20 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func
} }
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note) klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
}) })
if err != nil {
klog.Errorf("failed to start event watcher: '%v'", err)
return func() {}
}
return stopWatcher
} }
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording // The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() { func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
watcher, err := e.Watch() watcher, err := e.Watch()
if err != nil { if err != nil {
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
// TODO: Rewrite the function signature to return an error, for return nil, err
// now just return a no-op function
return func() {
klog.Error("The event watcher failed to start")
}
} }
go func() { go func() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
@ -326,10 +328,10 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
eventHandler(watchEvent.Object) eventHandler(watchEvent.Object)
} }
}() }()
return watcher.Stop return watcher.Stop, nil
} }
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) { func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
eventHandler := func(obj runtime.Object) { eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event) event, ok := obj.(*eventsv1.Event)
if !ok { if !ok {
@ -338,18 +340,26 @@ func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
} }
e.recordToSink(event, clock.RealClock{}) e.recordToSink(event, clock.RealClock{})
} }
stopWatcher := e.StartEventWatcher(eventHandler) stopWatcher, err := e.StartEventWatcher(eventHandler)
if err != nil {
return err
}
go func() { go func() {
<-stopCh <-stopCh
stopWatcher() stopWatcher()
}() }()
return nil
} }
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh) go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh) go wait.Until(e.finishSeries, finishTime, stopCh)
e.startRecordingEvents(stopCh) err := e.startRecordingEvents(stopCh)
if err != nil {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
}
} }
type eventBroadcasterAdapterImpl struct { type eventBroadcasterAdapterImpl struct {

View File

@ -161,7 +161,10 @@ func TestEventSeriesf(t *testing.T) {
// Don't call StartRecordingToSink, as we don't need neither refreshing event // Don't call StartRecordingToSink, as we don't need neither refreshing event
// series nor finishing them in this tests and additional events updated would // series nor finishing them in this tests and additional events updated would
// race with our expected ones. // race with our expected ones.
broadcaster.startRecordingEvents(stopCh) err = broadcaster.startRecordingEvents(stopCh)
if err != nil {
t.Fatal(err)
}
recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1})
// read from the chan as this was needed only to populate the cache // read from the chan as this was needed only to populate the cache
<-createEvent <-createEvent

View File

@ -55,7 +55,7 @@ type EventBroadcaster interface {
// of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests). // of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests).
// NOTE: events received on your eventHandler should be copied before being used. // NOTE: events received on your eventHandler should be copied before being used.
// TODO: figure out if this can be removed. // TODO: figure out if this can be removed.
StartEventWatcher(eventHandler func(event runtime.Object)) func() StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error)
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
// logging function. The return value can be ignored or used to stop recording, if desired. // logging function. The return value can be ignored or used to stop recording, if desired.