move scheduler to use v1beta1.events

Signed-off-by: Yassine TIJANI <ytijani@vmware.com>
This commit is contained in:
Yassine TIJANI
2019-06-03 02:16:10 +00:00
parent 2659b3755a
commit 08522f8e5a
17 changed files with 99 additions and 91 deletions

View File

@@ -27,6 +27,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@@ -40,7 +41,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
corelister "k8s.io/client-go/listers/core/v1"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
@@ -179,8 +180,7 @@ func TestSchedulerCreation(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, 0)
testSource := "testProvider"
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
defaultBindTimeout := int64(30)
factory.RegisterFitPredicate("PredicateOne", PredicateOne)
@@ -201,7 +201,7 @@ func TestSchedulerCreation(t *testing.T) {
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
informerFactory.Storage().V1beta1().CSINodes(),
eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
stopCh,
EmptyPluginRegistry,
@@ -215,11 +215,11 @@ func TestSchedulerCreation(t *testing.T) {
}
func TestScheduler(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
client := clientsetfake.NewSimpleClientset(&testNode)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
errS := errors.New("scheduler")
errB := errors.New("binder")
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
table := []struct {
name string
@@ -269,7 +269,6 @@ func TestScheduler(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
client := clientsetfake.NewSimpleClientset(&testNode)
informerFactory := informers.NewSharedInformerFactory(client, 0)
nl := informerFactory.Core().V1().Nodes().Lister()
@@ -310,11 +309,12 @@ func TestScheduler(t *testing.T) {
return item.sendPod
},
Framework: EmptyFramework,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, "scheduler"),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
})
called := make(chan struct{})
events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*v1beta1.Event)
if e, a := item.eventReason, e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a)
}
@@ -337,7 +337,7 @@ func TestScheduler(t *testing.T) {
if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) {
t.Errorf("error: %s", diff.ObjectDiff(e, a))
}
events.Stop()
stopFunc()
})
}
}
@@ -646,7 +646,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, recorder events.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) {
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil, nil),
@@ -683,7 +683,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Error: func(p *v1.Pod, err error) {
errChan <- err
},
Recorder: &record.FakeRecorder{},
Recorder: &events.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
Framework: EmptyFramework,
@@ -740,7 +740,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
Error: func(p *v1.Pod, err error) {
queuedPodStore.AddIfNotPresent(p)
},
Recorder: &record.FakeRecorder{},
Recorder: &events.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
@@ -751,7 +751,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
return sched, bindingChan
}
func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
pod := podWithID("foo", "")
@@ -769,7 +769,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
}
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"})
recorder := broadcaster.NewRecorder(scheme.Scheme, "scheduler")
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@@ -789,9 +789,9 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
findErr := fmt.Errorf("find err")
assumeErr := fmt.Errorf("assume err")
bindErr := fmt.Errorf("bind err")
client := clientsetfake.NewSimpleClientset()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(t.Logf).Stop()
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1beta1().Events("")})
// This can be small because we wait for pod to finish scheduling first
chanTimeout := 2 * time.Second
@@ -897,26 +897,22 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
t.Fatalf("Failed to get fake volume binder")
}
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
eventChan := make(chan struct{})
events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
e, _ := obj.(*v1beta1.Event)
if e, a := item.eventReason, e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a)
}
close(eventChan)
})
s.scheduleOne()
// Wait for pod to succeed or fail scheduling
select {
case <-eventChan:
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout)
}
events.Stop()
stopFunc()
// Wait for scheduling to return an error
select {
case err := <-errChan: