diff --git a/cmd/kube-scheduler/app/config/BUILD b/cmd/kube-scheduler/app/config/BUILD index 877ceef1e05..686eb493639 100644 --- a/cmd/kube-scheduler/app/config/BUILD +++ b/cmd/kube-scheduler/app/config/BUILD @@ -12,7 +12,9 @@ go_library( "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/events/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", ], diff --git a/cmd/kube-scheduler/app/config/config.go b/cmd/kube-scheduler/app/config/config.go index 2ad5ae9bf34..acc090f3f00 100644 --- a/cmd/kube-scheduler/app/config/config.go +++ b/cmd/kube-scheduler/app/config/config.go @@ -22,7 +22,9 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/kubernetes/typed/events/v1beta1" restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/record" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -45,9 +47,14 @@ type Config struct { Client clientset.Interface InformerFactory informers.SharedInformerFactory PodInformer coreinformers.PodInformer - EventClient v1core.EventsGetter - Recorder record.EventRecorder - Broadcaster record.EventBroadcaster + EventClient v1beta1.EventsGetter + + // TODO: Remove the following after fully migrating to the new events api. + CoreEventClient v1core.EventsGetter + LeaderElectionBroadcaster record.EventBroadcaster + + Recorder events.EventRecorder + Broadcaster events.EventBroadcaster // LeaderElection is optional. LeaderElection *leaderelection.LeaderElectionConfig diff --git a/cmd/kube-scheduler/app/options/BUILD b/cmd/kube-scheduler/app/options/BUILD index e73db131d60..cb094d3ff64 100644 --- a/cmd/kube-scheduler/app/options/BUILD +++ b/cmd/kube-scheduler/app/options/BUILD @@ -29,10 +29,10 @@ go_library( "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd/api:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection:go_default_library", "//staging/src/k8s.io/client-go/tools/leaderelection/resourcelock:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/cmd/kube-scheduler/app/options/options.go b/cmd/kube-scheduler/app/options/options.go index f645b3c4d13..c43ae2431e2 100644 --- a/cmd/kube-scheduler/app/options/options.go +++ b/cmd/kube-scheduler/app/options/options.go @@ -31,10 +31,10 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -237,13 +237,15 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) { } // Prepare event clients. - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient.EventsV1beta1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, c.ComponentConfig.SchedulerName) + leaderElectionBroadcaster := record.NewBroadcaster() + leaderElectionRecorder := leaderElectionBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName}) // Set up leader election if enabled. var leaderElectionConfig *leaderelection.LeaderElectionConfig if c.ComponentConfig.LeaderElection.LeaderElect { - leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder) + leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, leaderElectionRecorder) if err != nil { return nil, err } @@ -252,9 +254,11 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) { c.Client = client c.InformerFactory = informers.NewSharedInformerFactory(client, 0) c.PodInformer = factory.NewPodInformer(client, 0) - c.EventClient = eventClient + c.EventClient = eventClient.EventsV1beta1() + c.CoreEventClient = eventClient.CoreV1() c.Recorder = recorder c.Broadcaster = eventBroadcaster + c.LeaderElectionBroadcaster = leaderElectionBroadcaster c.LeaderElection = leaderElectionConfig return c, nil @@ -295,7 +299,7 @@ func makeLeaderElectionConfig(config kubeschedulerconfig.KubeSchedulerLeaderElec // createClients creates a kube client and an event client from the given config and masterOverride. // TODO remove masterOverride when CLI flags are removed. -func createClients(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string, timeout time.Duration) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) { +func createClients(config componentbaseconfig.ClientConnectionConfiguration, masterOverride string, timeout time.Duration) (clientset.Interface, clientset.Interface, clientset.Interface, error) { if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 { klog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.") } @@ -333,5 +337,5 @@ func createClients(config componentbaseconfig.ClientConnectionConfiguration, mas return nil, nil, nil, err } - return client, leaderElectionClient, eventClient.CoreV1(), nil + return client, leaderElectionClient, eventClient, nil } diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 833693f7b29..d905d3f087d 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -36,7 +36,7 @@ import ( "k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/routes" "k8s.io/apiserver/pkg/util/term" - v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/leaderelection" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/cli/globalflag" @@ -190,10 +190,11 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { - cc.Broadcaster.StartLogging(klog.V(6).Infof) - cc.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.EventClient.Events("")}) + cc.Broadcaster.StartRecordingToSink(stopCh) + } + if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil { + cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")}) } - // Setup healthz checks. var checks []healthz.HealthzChecker if cc.ComponentConfig.LeaderElection.LeaderElect { diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 9538ad0fcbd..af8e283f336 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -39,7 +39,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -67,6 +67,7 @@ go_test( "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/events/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", @@ -80,7 +81,7 @@ go_test( "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", ], ) diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 0ddc6a03c57..f763dfcd5cd 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -46,7 +46,7 @@ go_library( "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/storage/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 572b99ec50e..67df1bf7441 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -45,7 +45,7 @@ import ( storagelistersv1 "k8s.io/client-go/listers/storage/v1" storagelistersv1beta1 "k8s.io/client-go/listers/storage/v1beta1" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "k8s.io/klog" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" @@ -114,7 +114,7 @@ type Config struct { Error func(*v1.Pod, error) // Recorder is the EventRecorder to use - Recorder record.EventRecorder + Recorder events.EventRecorder // Close this to shut down the scheduler. StopEverything <-chan struct{} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 826ab3db76f..6d262fcf889 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -34,7 +34,7 @@ import ( storageinformersv1 "k8s.io/client-go/informers/storage/v1" storageinformersv1beta1 "k8s.io/client-go/informers/storage/v1beta1" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -130,7 +130,7 @@ func New(client clientset.Interface, pdbInformer policyinformers.PodDisruptionBudgetInformer, storageClassInformer storageinformersv1.StorageClassInformer, csiNodeInformer storageinformersv1beta1.CSINodeInformer, - recorder record.EventRecorder, + recorder events.EventRecorder, schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource, stopCh <-chan struct{}, registry framework.Registry, @@ -271,7 +271,7 @@ func (sched *Scheduler) Config() *factory.Config { // NOTE: This function modifies "pod". "pod" should be copied before being passed. func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) { sched.config.Error(pod, err) - sched.config.Recorder.Event(pod, v1.EventTypeWarning, "FailedScheduling", message) + sched.config.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, @@ -328,7 +328,8 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } - sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) + sched.config.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) + } metrics.PreemptionVictims.Set(float64(len(victims))) } @@ -436,7 +437,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error { metrics.DeprecatedBindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart)) metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart)) metrics.DeprecatedSchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart)) - sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name) + sched.config.Recorder.Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name) return nil } @@ -450,7 +451,7 @@ func (sched *Scheduler) scheduleOne() { return } if pod.DeletionTimestamp != nil { - sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) + sched.config.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 93c2bf5b769..8c8f76dcf5e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -27,6 +27,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/api/events/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -40,7 +41,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" corelister "k8s.io/client-go/listers/core/v1" clientcache "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -179,8 +180,7 @@ func TestSchedulerCreation(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, 0) testSource := "testProvider" - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(t.Logf).Stop() + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) defaultBindTimeout := int64(30) factory.RegisterFitPredicate("PredicateOne", PredicateOne) @@ -201,7 +201,7 @@ func TestSchedulerCreation(t *testing.T) { informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), informerFactory.Storage().V1beta1().CSINodes(), - eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}), + eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource}, stopCh, EmptyPluginRegistry, @@ -215,11 +215,11 @@ func TestSchedulerCreation(t *testing.T) { } func TestScheduler(t *testing.T) { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(t.Logf).Stop() + testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} + client := clientsetfake.NewSimpleClientset(&testNode) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) errS := errors.New("scheduler") errB := errors.New("binder") - testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} table := []struct { name string @@ -269,7 +269,6 @@ func TestScheduler(t *testing.T) { stop := make(chan struct{}) defer close(stop) - client := clientsetfake.NewSimpleClientset(&testNode) informerFactory := informers.NewSharedInformerFactory(client, 0) nl := informerFactory.Core().V1().Nodes().Lister() @@ -310,11 +309,12 @@ func TestScheduler(t *testing.T) { return item.sendPod }, Framework: EmptyFramework, - Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}), + Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"), VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), }) called := make(chan struct{}) - events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) { + stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { + e, _ := obj.(*v1beta1.Event) if e, a := item.eventReason, e.Reason; e != a { t.Errorf("expected %v, got %v", e, a) } @@ -337,7 +337,7 @@ func TestScheduler(t *testing.T) { if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) { t.Errorf("error: %s", diff.ObjectDiff(e, a)) } - events.Stop() + stopFunc() }) } } @@ -646,7 +646,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, internalqueue.NewSchedulingQueue(nil, nil), @@ -683,7 +683,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C Error: func(p *v1.Pod, err error) { errChan <- err }, - Recorder: &record.FakeRecorder{}, + Recorder: &events.FakeRecorder{}, PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, Framework: EmptyFramework, @@ -740,7 +740,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc Error: func(p *v1.Pod, err error) { queuedPodStore.AddIfNotPresent(p) }, - Recorder: &record.FakeRecorder{}, + Recorder: &events.FakeRecorder{}, PodConditionUpdater: fakePodConditionUpdater{}, PodPreemptor: fakePodPreemptor{}, StopEverything: stop, @@ -751,7 +751,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc return sched, bindingChan } -func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) pod := podWithID("foo", "") @@ -769,7 +769,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder), } - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}) + recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler") s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder) informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) @@ -789,9 +789,9 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { findErr := fmt.Errorf("find err") assumeErr := fmt.Errorf("assume err") bindErr := fmt.Errorf("bind err") + client := clientsetfake.NewSimpleClientset() - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(t.Logf).Stop() + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")}) // This can be small because we wait for pod to finish scheduling first chanTimeout := 2 * time.Second @@ -897,26 +897,22 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { t.Fatalf("Failed to get fake volume binder") } s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster) - eventChan := make(chan struct{}) - events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) { + stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { + e, _ := obj.(*v1beta1.Event) if e, a := item.eventReason, e.Reason; e != a { t.Errorf("expected %v, got %v", e, a) } close(eventChan) }) - s.scheduleOne() - // Wait for pod to succeed or fail scheduling select { case <-eventChan: case <-time.After(wait.ForeverTestTimeout): t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout) } - - events.Stop() - + stopFunc() // Wait for scheduling to return an error select { case err := <-errChan: diff --git a/test/integration/daemonset/BUILD b/test/integration/daemonset/BUILD index 78c6941743a..30e6badebdb 100644 --- a/test/integration/daemonset/BUILD +++ b/test/integration/daemonset/BUILD @@ -39,7 +39,7 @@ go_test( "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//staging/src/k8s.io/component-base/featuregate:go_default_library", diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index 1d2273e2280..4e8956030ec 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -37,7 +37,7 @@ import ( corev1client "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/retry" "k8s.io/component-base/featuregate" @@ -135,14 +135,14 @@ func setupScheduler( informerFactory.Storage().V1beta1().CSINodes(), ) - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ + Interface: cs.EventsV1beta1().Events(""), + }) schedulerConfig.Recorder = eventBroadcaster.NewRecorder( legacyscheme.Scheme, - v1.EventSource{Component: v1.DefaultSchedulerName}, + v1.DefaultSchedulerName, ) - eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{ - Interface: cs.CoreV1().Events(""), - }) + eventBroadcaster.StartRecordingToSink(stopCh) algorithmprovider.ApplyFeatureGates() diff --git a/test/integration/scheduler/BUILD b/test/integration/scheduler/BUILD index bdcef5c5779..d6db7645b93 100644 --- a/test/integration/scheduler/BUILD +++ b/test/integration/scheduler/BUILD @@ -57,11 +57,10 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//test/integration/framework:go_default_library", "//test/utils:go_default_library", @@ -114,12 +113,11 @@ go_library( "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/restmapper:go_default_library", "//staging/src/k8s.io/client-go/scale:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//test/integration/framework:go_default_library", "//test/utils/image:go_default_library", ], diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 9dfdc2e65a0..b6f5d67b966 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -31,11 +31,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" @@ -238,8 +237,9 @@ priorities: [] policyConfigMap.APIVersion = "v1" clientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(&policyConfigMap) - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet.EventsV1beta1().Events("")}) + stopCh := make(chan struct{}) + eventBroadcaster.StartRecordingToSink(stopCh) defaultBindTimeout := int64(30) @@ -255,7 +255,7 @@ priorities: [] informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), informerFactory.Storage().V1beta1().CSINodes(), - eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), + eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName), kubeschedulerconfig.SchedulerAlgorithmSource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{ ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{ @@ -310,8 +310,9 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet.EventsV1beta1().Events("")}) + stopCh := make(chan struct{}) + eventBroadcaster.StartRecordingToSink(stopCh) defaultBindTimeout := int64(30) @@ -327,7 +328,7 @@ func TestSchedulerCreationFromNonExistentConfigMap(t *testing.T) { informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), informerFactory.Storage().V1beta1().CSINodes(), - eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName}), + eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.DefaultSchedulerName), kubeschedulerconfig.SchedulerAlgorithmSource{ Policy: &kubeschedulerconfig.SchedulerPolicySource{ ConfigMap: &kubeschedulerconfig.SchedulerPolicyConfigMapSource{ @@ -610,9 +611,9 @@ func TestMultiScheduler(t *testing.T) { if err != nil { t.Errorf("Couldn't create scheduler config: %v", err) } - eventBroadcaster2 := record.NewBroadcaster() - schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: fooScheduler}) - eventBroadcaster2.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientSet2.CoreV1().Events("")}) + eventBroadcaster2 := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet2.EventsV1beta1().Events("")}) + schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, "k8s.io/"+fooScheduler) + eventBroadcaster2.StartRecordingToSink(stopCh) sched2 := scheduler.NewFromConfig(schedulerConfig2) scheduler.AddAllEventHandlers(sched2, diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 10bd6b989de..63aec5349be 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -38,12 +38,11 @@ import ( "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" - clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/client-go/scale" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" @@ -225,14 +224,15 @@ func initTestSchedulerWithOptions( controller.WaitForCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced) } - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ + Interface: context.clientSet.EventsV1beta1().Events(""), + }) context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder( legacyscheme.Scheme, - v1.EventSource{Component: v1.DefaultSchedulerName}, + v1.DefaultSchedulerName, ) - eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{ - Interface: context.clientSet.CoreV1().Events(""), - }) + stopCh := make(chan struct{}) + eventBroadcaster.StartRecordingToSink(stopCh) context.informerFactory.Start(context.schedulerConfig.StopEverything) context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything) diff --git a/test/integration/util/BUILD b/test/integration/util/BUILD index a92fd0f5e5a..32a9e52bed0 100644 --- a/test/integration/util/BUILD +++ b/test/integration/util/BUILD @@ -21,8 +21,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", - "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/tools/events:go_default_library", "//staging/src/k8s.io/legacy-cloud-providers/gce:go_default_library", "//test/integration/framework:go_default_library", "//vendor/github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud:go_default_library", diff --git a/test/integration/util/util.go b/test/integration/util/util.go index de30fba7abe..9e05bf2da94 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -23,8 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "k8s.io/klog" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/scheduler" @@ -62,19 +61,19 @@ func StartApiserver() (string, ShutdownFunc) { // and the shutdown function to stop it. func StartScheduler(clientSet clientset.Interface) (factory.Configurator, ShutdownFunc) { informerFactory := informers.NewSharedInformerFactory(clientSet, 0) - - evtBroadcaster := record.NewBroadcaster() - evtWatch := evtBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{ - Interface: clientSet.CoreV1().Events("")}) - stopCh := make(chan struct{}) + evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{ + Interface: clientSet.EventsV1beta1().Events("")}) + + evtBroadcaster.StartRecordingToSink(stopCh) + schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory, stopCh) config, err := schedulerConfigurator.CreateFromConfig(schedulerapi.Policy{}) if err != nil { klog.Fatalf("Error creating scheduler: %v", err) } - config.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}) + config.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, "scheduler") sched := scheduler.NewFromConfig(config) scheduler.AddAllEventHandlers(sched, @@ -93,7 +92,6 @@ func StartScheduler(clientSet clientset.Interface) (factory.Configurator, Shutdo shutdownFunc := func() { klog.Infof("destroying scheduler") - evtWatch.Stop() close(stopCh) klog.Infof("destroyed scheduler") }