diff --git a/tools/events/event_broadcaster.go b/tools/events/event_broadcaster.go index bcf1d0cb..6fdbaab6 100644 --- a/tools/events/event_broadcaster.go +++ b/tools/events/event_broadcaster.go @@ -34,7 +34,11 @@ import ( "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" typedv1beta1 "k8s.io/client-go/kubernetes/typed/events/v1beta1" + "k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record/util" "k8s.io/klog/v2" ) @@ -314,3 +318,56 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { stopWatcher() }() } + +type eventBroadcasterAdapterImpl struct { + coreClient typedv1core.EventsGetter + coreBroadcaster record.EventBroadcaster + v1beta1Client typedv1beta1.EventsGetter + v1beta1Broadcaster EventBroadcaster +} + +// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify +// migration of individual components to the new Event API. +func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter { + eventClient := &eventBroadcasterAdapterImpl{} + if _, err := client.Discovery().ServerResourcesForGroupVersion(v1beta1.SchemeGroupVersion.String()); err == nil { + eventClient.v1beta1Client = client.EventsV1beta1() + eventClient.v1beta1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.v1beta1Client.Events("")}) + } + // Even though there can soon exist cases when coreBroadcaster won't really be needed, + // we create it unconditionally because its overhead is minor and will simplify using usage + // patterns of this library in all components. + eventClient.coreClient = client.CoreV1() + eventClient.coreBroadcaster = record.NewBroadcaster() + return eventClient +} + +// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. +func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{}) { + if e.v1beta1Broadcaster != nil && e.v1beta1Client != nil { + e.v1beta1Broadcaster.StartRecordingToSink(stopCh) + } + if e.coreBroadcaster != nil && e.coreClient != nil { + e.coreBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: e.coreClient.Events("")}) + } +} + +func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder { + if e.v1beta1Broadcaster != nil && e.v1beta1Client != nil { + return e.v1beta1Broadcaster.NewRecorder(scheme.Scheme, name) + } + return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name)) +} + +func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder { + return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name}) +} + +func (e *eventBroadcasterAdapterImpl) Shutdown() { + if e.coreBroadcaster != nil { + e.coreBroadcaster.Shutdown() + } + if e.v1beta1Broadcaster != nil { + e.v1beta1Broadcaster.Shutdown() + } +} diff --git a/tools/events/interfaces.go b/tools/events/interfaces.go index 4e39156b..b54f7c9c 100644 --- a/tools/events/interfaces.go +++ b/tools/events/interfaces.go @@ -19,6 +19,7 @@ package events import ( "k8s.io/api/events/v1beta1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" ) // EventRecorder knows how to record events on behalf of an EventSource. @@ -68,3 +69,22 @@ type EventSink interface { Update(event *v1beta1.Event) (*v1beta1.Event, error) Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error) } + +// EventBroadcasterAdapter is a auxiliary interface to simplify migration to +// the new events API. It is a wrapper around new and legacy broadcasters +// that smartly chooses which one to use. +// +// Deprecated: This interface will be removed once migration is completed. +type EventBroadcasterAdapter interface { + // StartRecordingToSink starts sending events received from the specified eventBroadcaster. + StartRecordingToSink(stopCh <-chan struct{}) + + // NewRecorder creates a new Event Recorder with specified name. + NewRecorder(name string) EventRecorder + + // DeprecatedNewLegacyRecorder creates a legacy Event Recorder with specific name. + DeprecatedNewLegacyRecorder(name string) record.EventRecorder + + // Shutdown shuts down the broadcaster. + Shutdown() +}