diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 1948f1527ce..230865207e8 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -80,7 +80,7 @@ go_test( "//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/events/v1beta1:go_default_library", + "//staging/src/k8s.io/api/events/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 4f512dd17c0..18644c422ed 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -1333,7 +1333,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }, } informerFactory := informers.NewSharedInformerFactory(client, 0) - recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})) sched, err := scheduler.New( client, @@ -1509,7 +1509,7 @@ func TestAlgorithmProviderCompatibility(t *testing.T) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) - recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})) sched, err := scheduler.New( client, @@ -1938,7 +1938,7 @@ func TestPluginsConfigurationCompatibility(t *testing.T) { client := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) - recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})) sched, err := scheduler.New( client, diff --git a/pkg/scheduler/factory_test.go b/pkg/scheduler/factory_test.go index 0c6412c02e6..3ab7608945b 100644 --- a/pkg/scheduler/factory_test.go +++ b/pkg/scheduler/factory_test.go @@ -449,7 +449,7 @@ func newConfigFactoryWithFrameworkRegistry( registry frameworkruntime.Registry) *Configurator { informerFactory := informers.NewSharedInformerFactory(client, 0) snapshot := internalcache.NewEmptySnapshot() - recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})) + recorderFactory := profile.NewRecorderFactory(events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})) return &Configurator{ client: client, informerFactory: informerFactory, diff --git a/pkg/scheduler/profile/BUILD b/pkg/scheduler/profile/BUILD index d8851950051..8dee306ce40 100644 --- a/pkg/scheduler/profile/BUILD +++ b/pkg/scheduler/profile/BUILD @@ -39,7 +39,7 @@ go_test( "//pkg/scheduler/framework/runtime:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/events/v1beta1:go_default_library", + "//staging/src/k8s.io/api/events/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", diff --git a/pkg/scheduler/profile/profile_test.go b/pkg/scheduler/profile/profile_test.go index e3a7fca2055..45611bedc01 100644 --- a/pkg/scheduler/profile/profile_test.go +++ b/pkg/scheduler/profile/profile_test.go @@ -23,7 +23,7 @@ import ( "testing" v1 "k8s.io/api/core/v1" - "k8s.io/api/events/v1beta1" + eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/events" @@ -101,7 +101,7 @@ func TestNewProfile(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { c := fake.NewSimpleClientset() - b := events.NewBroadcaster(&events.EventSinkImpl{Interface: c.EventsV1beta1().Events("")}) + b := events.NewBroadcaster(&events.EventSinkImpl{Interface: c.EventsV1()}) p, err := NewProfile(tc.cfg, fakeFrameworkFactory, NewRecorderFactory(b)) if err := checkErr(err, tc.wantErr); err != nil { t.Fatal(err) @@ -113,7 +113,7 @@ func TestNewProfile(t *testing.T) { called := make(chan struct{}) var ctrl string stopFn := b.StartEventWatcher(func(obj runtime.Object) { - e, _ := obj.(*v1beta1.Event) + e, _ := obj.(*eventsv1.Event) ctrl = e.ReportingController close(called) }) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 903be658efa..914185bd879 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -33,7 +33,7 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" - "k8s.io/api/events/v1beta1" + eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -173,7 +173,7 @@ func TestSchedulerCreation(t *testing.T) { client := clientsetfake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(client, 0) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) stopCh := make(chan struct{}) defer close(stopCh) @@ -209,7 +209,7 @@ func TestSchedulerCreation(t *testing.T) { func TestSchedulerScheduleOne(t *testing.T) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} client := clientsetfake.NewSimpleClientset(&testNode) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) errS := errors.New("scheduler") errB := errors.New("binder") @@ -325,7 +325,7 @@ func TestSchedulerScheduleOne(t *testing.T) { } called := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { - e, _ := obj.(*v1beta1.Event) + e, _ := obj.(*eventsv1.Event) if e.Reason != item.eventReason { t.Errorf("got event %v, want %v", e.Reason, item.eventReason) } @@ -409,7 +409,7 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { // We use a fake filter that only allows one particular node. We create two // profiles, each with a different node in the filter configuration. client := clientsetfake.NewSimpleClientset(nodes...) - broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) + broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -469,7 +469,7 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) { }) controllers := make(map[string]string) stopFn := broadcaster.StartEventWatcher(func(obj runtime.Object) { - e, ok := obj.(*v1beta1.Event) + e, ok := obj.(*eventsv1.Event) if !ok || e.Reason != "Scheduled" { return } @@ -847,7 +847,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { bindErr := fmt.Errorf("bind err") client := clientsetfake.NewSimpleClientset() - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) // This can be small because we wait for pod to finish scheduling first chanTimeout := 2 * time.Second @@ -939,7 +939,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster) eventChan := make(chan struct{}) stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { - e, _ := obj.(*v1beta1.Event) + e, _ := obj.(*eventsv1.Event) if e, a := item.eventReason, e.Reason; e != a { t.Errorf("expected %v, got %v", e, a) } diff --git a/staging/src/k8s.io/client-go/tools/events/BUILD b/staging/src/k8s.io/client-go/tools/events/BUILD index 7e992522dac..fa0f66031f5 100644 --- a/staging/src/k8s.io/client-go/tools/events/BUILD +++ b/staging/src/k8s.io/client-go/tools/events/BUILD @@ -14,10 +14,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/events/v1beta1:go_default_library", + "//staging/src/k8s.io/api/events/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", @@ -27,7 +28,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/events/v1beta1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/events/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record/util:go_default_library", @@ -42,7 +43,7 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/events/v1beta1:go_default_library", + "//staging/src/k8s.io/api/events/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go index 6fdbaab6b3d..3c6870a20d5 100644 --- a/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go +++ b/staging/src/k8s.io/client-go/tools/events/event_broadcaster.go @@ -17,27 +17,29 @@ limitations under the License. package events import ( + "context" + "fmt" "os" "sync" "time" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" - restclient "k8s.io/client-go/rest" - - "k8s.io/api/events/v1beta1" "k8s.io/apimachinery/pkg/util/json" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" - typedv1beta1 "k8s.io/client-go/kubernetes/typed/events/v1beta1" + typedeventsv1 "k8s.io/client-go/kubernetes/typed/events/v1" + restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record/util" "k8s.io/klog/v2" @@ -64,40 +66,49 @@ type eventKey struct { type eventBroadcasterImpl struct { *watch.Broadcaster mu sync.Mutex - eventCache map[eventKey]*v1beta1.Event + eventCache map[eventKey]*eventsv1.Event sleepDuration time.Duration sink EventSink } -// EventSinkImpl wraps EventInterface to implement EventSink. +// EventSinkImpl wraps EventsV1Interface to implement EventSink. // TODO: this makes it easier for testing purpose and masks the logic of performing API calls. // Note that rollbacking to raw clientset should also be transparent. type EventSinkImpl struct { - Interface typedv1beta1.EventInterface + Interface typedeventsv1.EventsV1Interface } -// Create is the same as CreateWithEventNamespace of the EventExpansion -func (e *EventSinkImpl) Create(event *v1beta1.Event) (*v1beta1.Event, error) { - return e.Interface.CreateWithEventNamespace(event) +// Create takes the representation of a event and creates it. Returns the server's representation of the event, and an error, if there is any. +func (e *EventSinkImpl) Create(event *eventsv1.Event) (*eventsv1.Event, error) { + if event.Namespace == "" { + return nil, fmt.Errorf("can't create an event with empty namespace") + } + return e.Interface.Events(event.Namespace).Create(context.TODO(), event, metav1.CreateOptions{}) } -// Update is the same as UpdateithEventNamespace of the EventExpansion -func (e *EventSinkImpl) Update(event *v1beta1.Event) (*v1beta1.Event, error) { - return e.Interface.UpdateWithEventNamespace(event) +// Update takes the representation of a event and updates it. Returns the server's representation of the event, and an error, if there is any. +func (e *EventSinkImpl) Update(event *eventsv1.Event) (*eventsv1.Event, error) { + if event.Namespace == "" { + return nil, fmt.Errorf("can't update an event with empty namespace") + } + return e.Interface.Events(event.Namespace).Update(context.TODO(), event, metav1.UpdateOptions{}) } -// Patch is the same as PatchWithEventNamespace of the EventExpansion -func (e *EventSinkImpl) Patch(event *v1beta1.Event, data []byte) (*v1beta1.Event, error) { - return e.Interface.PatchWithEventNamespace(event, data) +// Patch applies the patch and returns the patched event, and an error, if there is any. +func (e *EventSinkImpl) Patch(event *eventsv1.Event, data []byte) (*eventsv1.Event, error) { + if event.Namespace == "" { + return nil, fmt.Errorf("can't patch an event with empty namespace") + } + return e.Interface.Events(event.Namespace).Patch(context.TODO(), event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) } // NewBroadcaster Creates a new event broadcaster. func NewBroadcaster(sink EventSink) EventBroadcaster { - return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*v1beta1.Event{}) + return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{}) } // NewBroadcasterForTest Creates a new event broadcaster for test purposes. -func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*v1beta1.Event) EventBroadcaster { +func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), eventCache: eventCache, @@ -154,11 +165,11 @@ func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingCont return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}} } -func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Clock) { +func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) { // Make a copy before modification, because there could be multiple listeners. eventCopy := event.DeepCopy() go func() { - evToRecord := func() *v1beta1.Event { + evToRecord := func() *eventsv1.Event { e.mu.Lock() defer e.mu.Unlock() eventKey := getKey(eventCopy) @@ -169,7 +180,7 @@ func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Cl isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()} return nil } - isomorphicEvent.Series = &v1beta1.EventSeries{ + isomorphicEvent.Series = &eventsv1.EventSeries{ Count: 1, LastObservedTime: metav1.MicroTime{Time: clock.Now()}, } @@ -190,7 +201,7 @@ func (e *eventBroadcasterImpl) recordToSink(event *v1beta1.Event, clock clock.Cl }() } -func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.Event { +func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event { tries := 0 for { if recordedEvent, retry := recordEvent(e.sink, event); !retry { @@ -207,8 +218,8 @@ func (e *eventBroadcasterImpl) attemptRecording(event *v1beta1.Event) *v1beta1.E } } -func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) { - var newEvent *v1beta1.Event +func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) { + var newEvent *eventsv1.Event var err error isEventSeries := event.Series != nil if isEventSeries { @@ -252,7 +263,7 @@ func recordEvent(sink EventSink, event *v1beta1.Event) (*v1beta1.Event, bool) { return nil, true } -func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) { +func createPatchBytesForSeries(event *eventsv1.Event) ([]byte, error) { oldEvent := event.DeepCopy() oldEvent.Series = nil oldData, err := json.Marshal(oldEvent) @@ -263,10 +274,10 @@ func createPatchBytesForSeries(event *v1beta1.Event) ([]byte, error) { if err != nil { return nil, err } - return strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1beta1.Event{}) + return strategicpatch.CreateTwoWayMergePatch(oldData, newData, eventsv1.Event{}) } -func getKey(event *v1beta1.Event) eventKey { +func getKey(event *eventsv1.Event) eventKey { key := eventKey{ action: event.Action, reason: event.Reason, @@ -305,9 +316,9 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { e.finishSeries() }, finishTime, stopCh) eventHandler := func(obj runtime.Object) { - event, ok := obj.(*v1beta1.Event) + event, ok := obj.(*eventsv1.Event) if !ok { - klog.Errorf("unexpected type, expected v1beta1.Event") + klog.Errorf("unexpected type, expected eventsv1.Event") return } e.recordToSink(event, clock.RealClock{}) @@ -320,19 +331,19 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { } type eventBroadcasterAdapterImpl struct { - coreClient typedv1core.EventsGetter - coreBroadcaster record.EventBroadcaster - v1beta1Client typedv1beta1.EventsGetter - v1beta1Broadcaster EventBroadcaster + coreClient typedv1core.EventsGetter + coreBroadcaster record.EventBroadcaster + eventsv1Client typedeventsv1.EventsV1Interface + eventsv1Broadcaster EventBroadcaster } // NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify // migration of individual components to the new Event API. func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter { eventClient := &eventBroadcasterAdapterImpl{} - if _, err := client.Discovery().ServerResourcesForGroupVersion(v1beta1.SchemeGroupVersion.String()); err == nil { - eventClient.v1beta1Client = client.EventsV1beta1() - eventClient.v1beta1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.v1beta1Client.Events("")}) + if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil { + eventClient.eventsv1Client = client.EventsV1() + eventClient.eventsv1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.eventsv1Client}) } // Even though there can soon exist cases when coreBroadcaster won't really be needed, // we create it unconditionally because its overhead is minor and will simplify using usage @@ -344,8 +355,8 @@ func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdap // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{}) { - if e.v1beta1Broadcaster != nil && e.v1beta1Client != nil { - e.v1beta1Broadcaster.StartRecordingToSink(stopCh) + if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil { + e.eventsv1Broadcaster.StartRecordingToSink(stopCh) } if e.coreBroadcaster != nil && e.coreClient != nil { e.coreBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: e.coreClient.Events("")}) @@ -353,8 +364,8 @@ func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{ } func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder { - if e.v1beta1Broadcaster != nil && e.v1beta1Client != nil { - return e.v1beta1Broadcaster.NewRecorder(scheme.Scheme, name) + if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil { + return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name) } return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name)) } @@ -367,7 +378,7 @@ func (e *eventBroadcasterAdapterImpl) Shutdown() { if e.coreBroadcaster != nil { e.coreBroadcaster.Shutdown() } - if e.v1beta1Broadcaster != nil { - e.v1beta1Broadcaster.Shutdown() + if e.eventsv1Broadcaster != nil { + e.eventsv1Broadcaster.Shutdown() } } diff --git a/staging/src/k8s.io/client-go/tools/events/event_recorder.go b/staging/src/k8s.io/client-go/tools/events/event_recorder.go index 0aa60356fea..60dd04fefee 100644 --- a/staging/src/k8s.io/client-go/tools/events/event_recorder.go +++ b/staging/src/k8s.io/client-go/tools/events/event_recorder.go @@ -21,15 +21,14 @@ import ( "time" v1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/reference" - - "k8s.io/api/events/v1beta1" "k8s.io/client-go/tools/record/util" + "k8s.io/client-go/tools/reference" "k8s.io/klog/v2" ) @@ -64,13 +63,13 @@ func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.O }() } -func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *v1beta1.Event { +func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *eventsv1.Event { t := metav1.Time{Time: recorder.clock.Now()} namespace := refRegarding.Namespace if namespace == "" { namespace = metav1.NamespaceDefault } - return &v1beta1.Event{ + return &eventsv1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()), Namespace: namespace, diff --git a/staging/src/k8s.io/client-go/tools/events/eventseries_test.go b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go index 42ea1cd72ec..39c2ae71c8b 100644 --- a/staging/src/k8s.io/client-go/tools/events/eventseries_test.go +++ b/staging/src/k8s.io/client-go/tools/events/eventseries_test.go @@ -25,7 +25,7 @@ import ( "strings" v1 "k8s.io/api/core/v1" - "k8s.io/api/events/v1beta1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -35,13 +35,13 @@ import ( ) type testEventSeriesSink struct { - OnCreate func(e *v1beta1.Event) (*v1beta1.Event, error) - OnUpdate func(e *v1beta1.Event) (*v1beta1.Event, error) - OnPatch func(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) + OnCreate func(e *eventsv1.Event) (*eventsv1.Event, error) + OnUpdate func(e *eventsv1.Event) (*eventsv1.Event, error) + OnPatch func(e *eventsv1.Event, p []byte) (*eventsv1.Event, error) } // Create records the event for testing. -func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) { +func (t *testEventSeriesSink) Create(e *eventsv1.Event) (*eventsv1.Event, error) { if t.OnCreate != nil { return t.OnCreate(e) } @@ -49,7 +49,7 @@ func (t *testEventSeriesSink) Create(e *v1beta1.Event) (*v1beta1.Event, error) { } // Update records the event for testing. -func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) { +func (t *testEventSeriesSink) Update(e *eventsv1.Event) (*eventsv1.Event, error) { if t.OnUpdate != nil { return t.OnUpdate(e) } @@ -57,7 +57,7 @@ func (t *testEventSeriesSink) Update(e *v1beta1.Event) (*v1beta1.Event, error) { } // Patch records the event for testing. -func (t *testEventSeriesSink) Patch(e *v1beta1.Event, p []byte) (*v1beta1.Event, error) { +func (t *testEventSeriesSink) Patch(e *eventsv1.Event, p []byte) (*eventsv1.Event, error) { if t.OnPatch != nil { return t.OnPatch(e, p) } @@ -86,7 +86,7 @@ func TestEventSeriesf(t *testing.T) { t.Fatal(err) } - expectedEvent := &v1beta1.Event{ + expectedEvent := &eventsv1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: "baz", @@ -107,13 +107,13 @@ func TestEventSeriesf(t *testing.T) { nonIsomorphicEvent := expectedEvent.DeepCopy() nonIsomorphicEvent.Action = "stopped" - expectedEvent.Series = &v1beta1.EventSeries{Count: 1} + expectedEvent.Series = &eventsv1.EventSeries{Count: 1} table := []struct { regarding k8sruntime.Object related k8sruntime.Object - actual *v1beta1.Event + actual *eventsv1.Event elements []interface{} - expect *v1beta1.Event + expect *eventsv1.Event expectUpdate bool }{ { @@ -136,27 +136,27 @@ func TestEventSeriesf(t *testing.T) { stopCh := make(chan struct{}) - createEvent := make(chan *v1beta1.Event) - updateEvent := make(chan *v1beta1.Event) - patchEvent := make(chan *v1beta1.Event) + createEvent := make(chan *eventsv1.Event) + updateEvent := make(chan *eventsv1.Event) + patchEvent := make(chan *eventsv1.Event) testEvents := testEventSeriesSink{ - OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) { createEvent <- event return event, nil }, - OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) { updateEvent <- event return event, nil }, - OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) { // event we receive is already patched, usually the sink uses it only to retrieve the name and namespace, here // we'll use it directly patchEvent <- event return event, nil }, } - eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*v1beta1.Event{}) + eventBroadcaster := newBroadcaster(&testEvents, 0, map[eventKey]*eventsv1.Event{}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "eventTest") eventBroadcaster.StartRecordingToSink(stopCh) recorder.Eventf(regarding, related, isomorphicEvent.Type, isomorphicEvent.Reason, isomorphicEvent.Action, isomorphicEvent.Note, []interface{}{1}) @@ -179,7 +179,7 @@ func TestEventSeriesf(t *testing.T) { close(stopCh) } -func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *v1beta1.Event, expectedEvent *v1beta1.Event, t *testing.T) { +func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *eventsv1.Event, expectedEvent *eventsv1.Event, t *testing.T) { recvEvent := *actualEvent // Just check that the timestamp was set. @@ -229,32 +229,32 @@ func TestFinishSeries(t *testing.T) { } LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} - createEvent := make(chan *v1beta1.Event, 10) - updateEvent := make(chan *v1beta1.Event, 10) - patchEvent := make(chan *v1beta1.Event, 10) + createEvent := make(chan *eventsv1.Event, 10) + updateEvent := make(chan *eventsv1.Event, 10) + patchEvent := make(chan *eventsv1.Event, 10) testEvents := testEventSeriesSink{ - OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) { createEvent <- event return event, nil }, - OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) { updateEvent <- event return event, nil }, - OnPatch: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + OnPatch: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) { // event we receive is already patched, usually the sink uses it // only to retrieve the name and namespace, here we'll use it directly patchEvent <- event return event, nil }, } - cache := map[eventKey]*v1beta1.Event{} + cache := map[eventKey]*eventsv1.Event{} eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") nonFinishedEvent := cachedEvent.DeepCopy() nonFinishedEvent.ReportingController = "nonFinished-controller" - cachedEvent.Series = &v1beta1.EventSeries{ + cachedEvent.Series = &eventsv1.EventSeries{ Count: 10, LastObservedTime: LastObservedTime, } @@ -300,15 +300,15 @@ func TestRefreshExistingEventSeries(t *testing.T) { t.Fatal(err) } LastObservedTime := metav1.MicroTime{Time: time.Now().Add(-9 * time.Minute)} - createEvent := make(chan *v1beta1.Event, 10) - updateEvent := make(chan *v1beta1.Event, 10) - patchEvent := make(chan *v1beta1.Event, 10) + createEvent := make(chan *eventsv1.Event, 10) + updateEvent := make(chan *eventsv1.Event, 10) + patchEvent := make(chan *eventsv1.Event, 10) table := []struct { - patchFunc func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) + patchFunc func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) }{ { - patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) { // event we receive is already patched, usually the sink uses it //only to retrieve the name and namespace, here we'll use it directly. patchEvent <- event @@ -316,7 +316,7 @@ func TestRefreshExistingEventSeries(t *testing.T) { }, }, { - patchFunc: func(event *v1beta1.Event, patch []byte) (*v1beta1.Event, error) { + patchFunc: func(event *eventsv1.Event, patch []byte) (*eventsv1.Event, error) { // we simulate an apiserver error here patchEvent <- nil return nil, &restclient.RequestConstructionError{} @@ -325,21 +325,21 @@ func TestRefreshExistingEventSeries(t *testing.T) { } for _, item := range table { testEvents := testEventSeriesSink{ - OnCreate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + OnCreate: func(event *eventsv1.Event) (*eventsv1.Event, error) { createEvent <- event return event, nil }, - OnUpdate: func(event *v1beta1.Event) (*v1beta1.Event, error) { + OnUpdate: func(event *eventsv1.Event) (*eventsv1.Event, error) { updateEvent <- event return event, nil }, OnPatch: item.patchFunc, } - cache := map[eventKey]*v1beta1.Event{} + cache := map[eventKey]*eventsv1.Event{} eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") - cachedEvent.Series = &v1beta1.EventSeries{ + cachedEvent.Series = &eventsv1.EventSeries{ Count: 10, LastObservedTime: LastObservedTime, } diff --git a/staging/src/k8s.io/client-go/tools/events/interfaces.go b/staging/src/k8s.io/client-go/tools/events/interfaces.go index b54f7c9c560..f1a523caa8e 100644 --- a/staging/src/k8s.io/client-go/tools/events/interfaces.go +++ b/staging/src/k8s.io/client-go/tools/events/interfaces.go @@ -17,7 +17,7 @@ limitations under the License. package events import ( - "k8s.io/api/events/v1beta1" + eventsv1 "k8s.io/api/events/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ) @@ -65,9 +65,9 @@ type EventBroadcaster interface { // It is assumed that EventSink will return the same sorts of errors as // client-go's REST client. type EventSink interface { - Create(event *v1beta1.Event) (*v1beta1.Event, error) - Update(event *v1beta1.Event) (*v1beta1.Event, error) - Patch(oldEvent *v1beta1.Event, data []byte) (*v1beta1.Event, error) + Create(event *eventsv1.Event) (*eventsv1.Event, error) + Update(event *eventsv1.Event) (*eventsv1.Event, error) + Patch(oldEvent *eventsv1.Event, data []byte) (*eventsv1.Event, error) } // EventBroadcasterAdapter is a auxiliary interface to simplify migration to diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 3ea7c879648..0fc8d8e0838 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -85,7 +85,7 @@ func setupScheduler( informerFactory informers.SharedInformerFactory, ) { eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: cs.EventsV1beta1().Events(""), + Interface: cs.EventsV1(), }) sched, err := scheduler.New( diff --git a/test/integration/events/events_test.go b/test/integration/events/events_test.go index e6c64106d21..bb8830076ef 100644 --- a/test/integration/events/events_test.go +++ b/test/integration/events/events_test.go @@ -65,17 +65,17 @@ func TestEventCompatibility(t *testing.T) { oldBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{Interface: client.CoreV1().Events("")}) oldRecorder.Eventf(regarding, v1.EventTypeNormal, "started", "note") - newBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) + newBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) newRecorder := newBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-scheduler") newBroadcaster.StartRecordingToSink(stopCh) newRecorder.Eventf(regarding, related, v1.EventTypeNormal, "memoryPressure", "killed", "memory pressure") err = wait.PollImmediate(100*time.Millisecond, 20*time.Second, func() (done bool, err error) { - v1beta1Events, err := client.EventsV1beta1().Events("").List(context.TODO(), metav1.ListOptions{}) + v1Events, err := client.EventsV1().Events("").List(context.TODO(), metav1.ListOptions{}) if err != nil { return false, err } - if len(v1beta1Events.Items) != 2 { + if len(v1Events.Items) != 2 { return false, nil } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index d29027461bf..4b15ffcc9ff 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -269,7 +269,7 @@ priorities: [] policyConfigMap.APIVersion = "v1" clientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &policyConfigMap, metav1.CreateOptions{}) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet.EventsV1beta1().Events("")}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet.EventsV1()}) stopCh := make(chan struct{}) eventBroadcaster.StartRecordingToSink(stopCh) @@ -323,7 +323,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet.EventsV1beta1().Events("")}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet.EventsV1()}) stopCh := make(chan struct{}) eventBroadcaster.StartRecordingToSink(stopCh) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index c56c7b68f85..08d195f6dce 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -80,7 +80,7 @@ func StartScheduler(clientSet clientset.Interface) (*scheduler.Scheduler, corein informerFactory := informers.NewSharedInformerFactory(clientSet, 0) podInformer := informerFactory.Core().V1().Pods() evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: clientSet.EventsV1beta1().Events("")}) + Interface: clientSet.EventsV1()}) evtBroadcaster.StartRecordingToSink(ctx.Done()) @@ -401,7 +401,7 @@ func InitTestSchedulerWithOptions( } var err error eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: testCtx.ClientSet.EventsV1beta1().Events(""), + Interface: testCtx.ClientSet.EventsV1(), }) if policy != nil { diff --git a/test/integration/volumescheduling/util.go b/test/integration/volumescheduling/util.go index a4efc0fde6f..283f83c6eca 100644 --- a/test/integration/volumescheduling/util.go +++ b/test/integration/volumescheduling/util.go @@ -105,7 +105,7 @@ func initTestSchedulerWithOptions( podInformer := testCtx.informerFactory.Core().V1().Pods() eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ - Interface: testCtx.clientSet.EventsV1beta1().Events(""), + Interface: testCtx.clientSet.EventsV1(), }) var err error