refactor scheduler test and include phantom test

This commit is contained in:
Hongchao Deng 2016-06-21 09:55:23 -07:00
parent fadde38df6
commit 8cd55e8e52

View File

@ -18,9 +18,7 @@ package scheduler
import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"
@ -30,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -169,163 +168,167 @@ func TestScheduler(t *testing.T) {
}
}
func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
// Set up a channel through which we'll funnel log messages from the watcher.
// This way, we can guarantee that when the test ends no thread will still be
// trying to write to t.Logf (which it would if we handed t.Logf directly to
// StartLogging).
ch := make(chan string)
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case msg := <-ch:
t.Log(msg)
case <-done:
return
}
}
}()
eventBroadcaster := record.NewBroadcaster()
watcher := eventBroadcaster.StartLogging(func(format string, args ...interface{}) {
ch <- fmt.Sprintf(format, args...)
})
defer func() {
watcher.Stop()
close(done)
wg.Wait()
}()
// Setup stores to test pod's workflow:
// - queuedPodStore: pods queued before processing
// - scheduledPodStore: pods that has a scheduling decision
scheduledPodStore := clientcache.NewStore(clientcache.MetaNamespaceKeyFunc)
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
// Port is the easiest way to cause a fit predicate failure
podPort := 8080
firstPod := podWithPort("foo", "", podPort)
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
cache := schedulercache.New(1*time.Second, stop)
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(100*time.Millisecond, stop)
pod := podWithPort("pod.Name", "", 8080)
scheduler, bindingChan, _ := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, pod)
waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
go func() {
for {
select {
case <-timeout:
return
default:
}
pods, err := scache.List(labels.Everything())
if err != nil {
t.Fatalf("cache.List failed: %v", err)
}
if len(pods) == 0 {
close(waitPodExpireChan)
return
}
time.Sleep(100 * time.Millisecond)
}
}()
// waiting for the assumed pod to expire
select {
case <-waitPodExpireChan:
case <-time.After(wait.ForeverTestTimeout):
close(timeout)
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
secondPod := podWithPort("bar", "", 8080)
queuedPodStore.Add(secondPod)
scheduler.scheduleOne()
select {
case b := <-bindingChan:
expectBinding := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
}
if !reflect.DeepEqual(expectBinding, b) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
}
func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(10*time.Minute, stop)
firstPod := podWithPort("pod.Name", "", 8080)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, firstPod)
// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
queuedPodStore.Add(secondPod)
// queuedPodStore: [bar:8080]
// cache: [(assumed)foo:8080]
scheduler.scheduleOne()
select {
case err := <-errChan:
expectErr := &FitError{
Pod: secondPod,
FailedPredicates: FailedPredicateMap{"machine1": "PodFitsHostPorts"},
}
if !reflect.DeepEqual(expectErr, err) {
t.Errorf("err want=%v, get=%v", expectErr, err)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
// We mimic the workflow of cache behavior when a pod is removed by user.
// Note: if the schedulercache timeout would be super short, the first pod would expire
// and would be removed itself (without any explicit actions on schedulercache). Even in that case,
// explicitly AddPod will as well correct the behavior.
firstPod.Spec.NodeName = "machine1"
if err := scache.AddPod(firstPod); err != nil {
t.Fatalf("err: %v", err)
}
if err := scache.RemovePod(firstPod); err != nil {
t.Fatalf("err: %v", err)
}
queuedPodStore.Add(secondPod)
scheduler.scheduleOne()
select {
case b := <-bindingChan:
expectBinding := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
}
if !reflect.DeepEqual(expectBinding, b) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
}
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePod(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, pod *api.Pod) (*Scheduler, chan *api.Binding, chan error) {
// Create the scheduler config
algo := NewGenericScheduler(
cache,
scache,
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
[]algorithm.PriorityConfig{},
[]algorithm.SchedulerExtender{})
var gotBinding *api.Binding
c := &Config{
SchedulerCache: cache,
bindingChan := make(chan *api.Binding, 1)
errChan := make(chan error, 1)
cfg := &Config{
SchedulerCache: scache,
NodeLister: algorithm.FakeNodeLister(
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
),
Algorithm: algo,
Binder: fakeBinder{func(b *api.Binding) error {
scheduledPodStore.Add(podWithPort(b.Name, b.Target.Name, podPort))
gotBinding = b
bindingChan <- b
return nil
}},
NextPod: func() *api.Pod {
return clientcache.Pop(queuedPodStore).(*api.Pod)
},
Error: func(p *api.Pod, err error) {
t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err)
errChan <- err
},
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
}
scheduler := New(cfg)
// First scheduling pass should schedule the pod
s := New(c)
called := make(chan struct{})
events := eventBroadcaster.StartEventWatcher(func(e *api.Event) {
if e, a := "Scheduled", e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a)
}
close(called)
})
queuedPodStore.Add(firstPod)
queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
// scheduledPodStore: []
// assumedPods: []
// cache: []
s.scheduleOne()
<-called
scheduler.scheduleOne()
// queuedPodStore: []
// scheduledPodStore: [foo:8080]
// assumedPods: [foo:8080]
// cache: [(assumed)foo:8080]
pod, exists, _ := scheduledPodStore.GetByKey("foo")
if !exists {
t.Errorf("Expected scheduled pod store to contain pod")
}
pod, exists, _ = queuedPodStore.GetByKey("foo")
if exists {
t.Errorf("Did not expect a queued pod, found %+v", pod)
}
expectBind := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
}
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac))
}
events.Stop()
scheduledPodStore.Delete(pod)
secondPod := podWithPort("bar", "", podPort)
queuedPodStore.Add(secondPod)
// queuedPodStore: [bar:8080]
// scheduledPodStore: []
// assumedPods: [foo:8080]
var waitUntilExpired sync.WaitGroup
waitUntilExpired.Add(1)
// waiting for the assumed pod to expire
go func() {
for {
pods, err := cache.List(labels.Everything())
if err != nil {
t.Fatalf("cache.List failed: %v", err)
}
if len(pods) == 0 {
waitUntilExpired.Done()
return
}
time.Sleep(1 * time.Second)
select {
case b := <-bindingChan:
expectBinding := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "pod.Name"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
}
}()
waitUntilExpired.Wait()
// Second scheduling pass will fail to schedule if the store hasn't expired
// the deleted pod. This would normally happen with a timeout.
called = make(chan struct{})
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {
if e, a := "Scheduled", e.Reason; e != a {
t.Errorf("expected %v, got %v", e, a)
if !reflect.DeepEqual(expectBinding, b) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
close(called)
})
s.scheduleOne()
<-called
expectBind = &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Target: api.ObjectReference{Kind: "Node", Name: "machine1"},
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
}
if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) {
t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac))
}
events.Stop()
return scheduler, bindingChan, errChan
}