diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 85277c41d6d..ff016a0aa97 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -295,66 +295,6 @@ func podRunning(c *client.Client, podNamespace string, podName string) wait.Cond } } -func runSchedulerNoPhantomPodsTest(client *client.Client) { - pod := &api.Pod{ - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "c1", - Image: e2e.GetPauseImageName(client), - Ports: []api.ContainerPort{ - {ContainerPort: 1234, HostPort: 9999}, - }, - ImagePullPolicy: api.PullIfNotPresent, - }, - }, - }, - } - - // Assuming we only have two kubelets, the third pod here won't schedule - // if the scheduler doesn't correctly handle the delete for the second - // pod. - pod.ObjectMeta.Name = "phantom.foo" - foo, err := client.Pods(api.NamespaceDefault).Create(pod) - if err != nil { - glog.Fatalf("Failed to create pod: %v, %v", pod, err) - } - if err := wait.Poll(time.Second, longTestTimeout, podRunning(client, foo.Namespace, foo.Name)); err != nil { - glog.Fatalf("FAILED: pod never started running %v", err) - } - - pod.ObjectMeta.Name = "phantom.bar" - bar, err := client.Pods(api.NamespaceDefault).Create(pod) - if err != nil { - glog.Fatalf("Failed to create pod: %v, %v", pod, err) - } - if err := wait.Poll(time.Second, longTestTimeout, podRunning(client, bar.Namespace, bar.Name)); err != nil { - glog.Fatalf("FAILED: pod never started running %v", err) - } - - // Delete a pod to free up room. - glog.Infof("Deleting pod %v", bar.Name) - err = client.Pods(api.NamespaceDefault).Delete(bar.Name, api.NewDeleteOptions(0)) - if err != nil { - glog.Fatalf("FAILED: couldn't delete pod %q: %v", bar.Name, err) - } - - pod.ObjectMeta.Name = "phantom.baz" - baz, err := client.Pods(api.NamespaceDefault).Create(pod) - if err != nil { - glog.Fatalf("Failed to create pod: %v, %v", pod, err) - } - if err := wait.Poll(time.Second, longTestTimeout, podRunning(client, baz.Namespace, baz.Name)); err != nil { - if pod, perr := client.Pods(api.NamespaceDefault).Get("phantom.bar"); perr == nil { - glog.Fatalf("FAILED: 'phantom.bar' was never deleted: %#v, err: %v", pod, err) - } else { - glog.Fatalf("FAILED: (Scheduler probably didn't process deletion of 'phantom.bar') Pod never started running: err: %v, perr: %v", err, perr) - } - } - - glog.Info("Scheduler doesn't make phantom pods: test passed.") -} - type testFunc func(*client.Client) func addFlags(fs *pflag.FlagSet) { @@ -457,11 +397,6 @@ func main() { } glog.Infof("OK - found created containers: %#v", createdConts.List()) - // This test doesn't run with the others because it can't run in - // parallel and also it schedules extra pods which would change the - // above pod counting logic. - runSchedulerNoPhantomPodsTest(kubeClient) - glog.Infof("\n\nLogging high latency metrics from the 10250 kubelet") e2e.HighLatencyKubeletOperations(nil, 1*time.Second, "localhost:10250") glog.Infof("\n\nLogging high latency metrics from the 10251 kubelet") diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index cb326ec0e26..a2a08c6d104 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -18,9 +18,7 @@ package scheduler import ( "errors" - "fmt" "reflect" - "sync" "testing" "time" @@ -30,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/diff" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -169,163 +168,167 @@ func TestScheduler(t *testing.T) { } } -func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) { - // Set up a channel through which we'll funnel log messages from the watcher. - // This way, we can guarantee that when the test ends no thread will still be - // trying to write to t.Logf (which it would if we handed t.Logf directly to - // StartLogging). - ch := make(chan string) - done := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case msg := <-ch: - t.Log(msg) - case <-done: - return - } - } - }() - eventBroadcaster := record.NewBroadcaster() - watcher := eventBroadcaster.StartLogging(func(format string, args ...interface{}) { - ch <- fmt.Sprintf(format, args...) - }) - defer func() { - watcher.Stop() - close(done) - wg.Wait() - }() - - // Setup stores to test pod's workflow: - // - queuedPodStore: pods queued before processing - // - scheduledPodStore: pods that has a scheduling decision - scheduledPodStore := clientcache.NewStore(clientcache.MetaNamespaceKeyFunc) - queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) - - // Port is the easiest way to cause a fit predicate failure - podPort := 8080 - firstPod := podWithPort("foo", "", podPort) - +func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { stop := make(chan struct{}) defer close(stop) - cache := schedulercache.New(1*time.Second, stop) + queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) + scache := schedulercache.New(100*time.Millisecond, stop) + pod := podWithPort("pod.Name", "", 8080) + scheduler, bindingChan, _ := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, pod) + + waitPodExpireChan := make(chan struct{}) + timeout := make(chan struct{}) + go func() { + for { + select { + case <-timeout: + return + default: + } + pods, err := scache.List(labels.Everything()) + if err != nil { + t.Fatalf("cache.List failed: %v", err) + } + if len(pods) == 0 { + close(waitPodExpireChan) + return + } + time.Sleep(100 * time.Millisecond) + } + }() + // waiting for the assumed pod to expire + select { + case <-waitPodExpireChan: + case <-time.After(wait.ForeverTestTimeout): + close(timeout) + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } + + // We use conflicted pod ports to incur fit predicate failure if first pod not removed. + secondPod := podWithPort("bar", "", 8080) + queuedPodStore.Add(secondPod) + scheduler.scheduleOne() + select { + case b := <-bindingChan: + expectBinding := &api.Binding{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Target: api.ObjectReference{Kind: "Node", Name: "machine1"}, + } + if !reflect.DeepEqual(expectBinding, b) { + t.Errorf("binding want=%v, get=%v", expectBinding, b) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } +} + +func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { + stop := make(chan struct{}) + defer close(stop) + queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) + scache := schedulercache.New(10*time.Minute, stop) + firstPod := podWithPort("pod.Name", "", 8080) + scheduler, bindingChan, errChan := setupTestSchedulerWithOnePod(t, queuedPodStore, scache, firstPod) + + // We use conflicted pod ports to incur fit predicate failure. + secondPod := podWithPort("bar", "", 8080) + queuedPodStore.Add(secondPod) + // queuedPodStore: [bar:8080] + // cache: [(assumed)foo:8080] + + scheduler.scheduleOne() + select { + case err := <-errChan: + expectErr := &FitError{ + Pod: secondPod, + FailedPredicates: FailedPredicateMap{"machine1": "PodFitsHostPorts"}, + } + if !reflect.DeepEqual(expectErr, err) { + t.Errorf("err want=%v, get=%v", expectErr, err) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } + + // We mimic the workflow of cache behavior when a pod is removed by user. + // Note: if the schedulercache timeout would be super short, the first pod would expire + // and would be removed itself (without any explicit actions on schedulercache). Even in that case, + // explicitly AddPod will as well correct the behavior. + firstPod.Spec.NodeName = "machine1" + if err := scache.AddPod(firstPod); err != nil { + t.Fatalf("err: %v", err) + } + if err := scache.RemovePod(firstPod); err != nil { + t.Fatalf("err: %v", err) + } + + queuedPodStore.Add(secondPod) + scheduler.scheduleOne() + select { + case b := <-bindingChan: + expectBinding := &api.Binding{ + ObjectMeta: api.ObjectMeta{Name: "bar"}, + Target: api.ObjectReference{Kind: "Node", Name: "machine1"}, + } + if !reflect.DeepEqual(expectBinding, b) { + t.Errorf("binding want=%v, get=%v", expectBinding, b) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) + } +} + +// queuedPodStore: pods queued before processing. +// cache: scheduler cache that might contain assumed pods. +func setupTestSchedulerWithOnePod(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, pod *api.Pod) (*Scheduler, chan *api.Binding, chan error) { // Create the scheduler config algo := NewGenericScheduler( - cache, + scache, map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}, []algorithm.PriorityConfig{}, []algorithm.SchedulerExtender{}) - - var gotBinding *api.Binding - c := &Config{ - SchedulerCache: cache, + bindingChan := make(chan *api.Binding, 1) + errChan := make(chan error, 1) + cfg := &Config{ + SchedulerCache: scache, NodeLister: algorithm.FakeNodeLister( api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, ), Algorithm: algo, Binder: fakeBinder{func(b *api.Binding) error { - scheduledPodStore.Add(podWithPort(b.Name, b.Target.Name, podPort)) - gotBinding = b + bindingChan <- b return nil }}, NextPod: func() *api.Pod { return clientcache.Pop(queuedPodStore).(*api.Pod) }, Error: func(p *api.Pod, err error) { - t.Errorf("Unexpected error when scheduling pod %+v: %v", p, err) + errChan <- err }, - Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}), + Recorder: &record.FakeRecorder{}, + PodConditionUpdater: fakePodConditionUpdater{}, } + scheduler := New(cfg) - // First scheduling pass should schedule the pod - s := New(c) - called := make(chan struct{}) - events := eventBroadcaster.StartEventWatcher(func(e *api.Event) { - if e, a := "Scheduled", e.Reason; e != a { - t.Errorf("expected %v, got %v", e, a) - } - close(called) - }) - - queuedPodStore.Add(firstPod) + queuedPodStore.Add(pod) // queuedPodStore: [foo:8080] - // scheduledPodStore: [] - // assumedPods: [] + // cache: [] - s.scheduleOne() - <-called + scheduler.scheduleOne() // queuedPodStore: [] - // scheduledPodStore: [foo:8080] - // assumedPods: [foo:8080] + // cache: [(assumed)foo:8080] - pod, exists, _ := scheduledPodStore.GetByKey("foo") - if !exists { - t.Errorf("Expected scheduled pod store to contain pod") - } - pod, exists, _ = queuedPodStore.GetByKey("foo") - if exists { - t.Errorf("Did not expect a queued pod, found %+v", pod) - } - - expectBind := &api.Binding{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Target: api.ObjectReference{Kind: "Node", Name: "machine1"}, - } - if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) { - t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac)) - } - - events.Stop() - - scheduledPodStore.Delete(pod) - - secondPod := podWithPort("bar", "", podPort) - queuedPodStore.Add(secondPod) - // queuedPodStore: [bar:8080] - // scheduledPodStore: [] - // assumedPods: [foo:8080] - - var waitUntilExpired sync.WaitGroup - waitUntilExpired.Add(1) - // waiting for the assumed pod to expire - go func() { - for { - pods, err := cache.List(labels.Everything()) - if err != nil { - t.Fatalf("cache.List failed: %v", err) - } - if len(pods) == 0 { - waitUntilExpired.Done() - return - } - time.Sleep(1 * time.Second) + select { + case b := <-bindingChan: + expectBinding := &api.Binding{ + ObjectMeta: api.ObjectMeta{Name: "pod.Name"}, + Target: api.ObjectReference{Kind: "Node", Name: "machine1"}, } - }() - waitUntilExpired.Wait() - - // Second scheduling pass will fail to schedule if the store hasn't expired - // the deleted pod. This would normally happen with a timeout. - - called = make(chan struct{}) - events = eventBroadcaster.StartEventWatcher(func(e *api.Event) { - if e, a := "Scheduled", e.Reason; e != a { - t.Errorf("expected %v, got %v", e, a) + if !reflect.DeepEqual(expectBinding, b) { + t.Errorf("binding want=%v, get=%v", expectBinding, b) } - close(called) - }) - - s.scheduleOne() - <-called - - expectBind = &api.Binding{ - ObjectMeta: api.ObjectMeta{Name: "bar"}, - Target: api.ObjectReference{Kind: "Node", Name: "machine1"}, + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("timeout after %v", wait.ForeverTestTimeout) } - if ex, ac := expectBind, gotBinding; !reflect.DeepEqual(ex, ac) { - t.Errorf("Expected exact match on binding: %s", diff.ObjectDiff(ex, ac)) - } - events.Stop() + return scheduler, bindingChan, errChan }