diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 6f52ba00287..aea715004c9 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -138,6 +138,9 @@ func (s *Scheduler) scheduleOne() { // If binding succeeded then PodScheduled condition will be updated in apiserver so that // it's atomic with setting host. err := s.config.Binder.Bind(b) + if err := s.config.SchedulerCache.FinishBinding(&assumed); err != nil { + glog.Errorf("scheduler cache FinishBinding failed: %v", err) + } if err != nil { glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name) if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil { diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 5f4adf01994..42b73035a48 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -297,6 +297,65 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { } } +// Scheduler should preserve predicate constraint even if binding was longer +// than cache ttl +func TestSchedulerErrorWithLongBinding(t *testing.T) { + stop := make(chan struct{}) + defer close(stop) + + firstPod := podWithPort("foo", "", 8080) + conflictPod := podWithPort("bar", "", 8080) + pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod} + for _, test := range []struct { + Expected map[string]bool + CacheTTL time.Duration + BindingDuration time.Duration + }{ + { + Expected: map[string]bool{firstPod.Name: true}, + CacheTTL: 100 * time.Millisecond, + BindingDuration: 300 * time.Millisecond, + }, + { + Expected: map[string]bool{firstPod.Name: true}, + CacheTTL: 10 * time.Second, + BindingDuration: 300 * time.Millisecond, + }, + } { + queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) + scache := schedulercache.New(test.CacheTTL, stop) + + node := v1.Node{ObjectMeta: v1.ObjectMeta{Name: "machine1"}} + scache.AddNode(&node) + + nodeLister := algorithm.FakeNodeLister([]*v1.Node{&node}) + predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} + + scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry( + queuedPodStore, scache, nodeLister, predicateMap, stop, test.BindingDuration) + scheduler.Run() + queuedPodStore.Add(firstPod) + queuedPodStore.Add(conflictPod) + + resultBindings := map[string]bool{} + waitChan := time.After(5 * time.Second) + for finished := false; !finished; { + select { + case b := <-bindingChan: + resultBindings[b.Name] = true + p := pods[b.Name] + p.Spec.NodeName = b.Target.Name + scache.AddPod(p) + case <-waitChan: + finished = true + } + } + if !reflect.DeepEqual(resultBindings, test.Expected) { + t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected) + } + } +} + // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, @@ -429,3 +488,34 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. } return New(cfg), bindingChan, errChan } + +func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { + algo := NewGenericScheduler( + scache, + predicateMap, + algorithm.EmptyMetadataProducer, + []algorithm.PriorityConfig{}, + algorithm.EmptyMetadataProducer, + []algorithm.SchedulerExtender{}) + bindingChan := make(chan *v1.Binding, 2) + cfg := &Config{ + SchedulerCache: scache, + NodeLister: nodeLister, + Algorithm: algo, + Binder: fakeBinder{func(b *v1.Binding) error { + time.Sleep(bindingTime) + bindingChan <- b + return nil + }}, + NextPod: func() *v1.Pod { + return clientcache.Pop(queuedPodStore).(*v1.Pod) + }, + Error: func(p *v1.Pod, err error) { + queuedPodStore.AddIfNotPresent(p) + }, + Recorder: &record.FakeRecorder{}, + PodConditionUpdater: fakePodConditionUpdater{}, + StopEverything: stop, + } + return New(cfg), bindingChan +} diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 28bd1c77895..ae4f18b3b42 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -60,6 +60,8 @@ type podState struct { pod *v1.Pod // Used by assumedPod to determinate expiration. deadline *time.Time + // Used to block cache from expiring assumedPod if binding still runs + bindingFinished bool } func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { @@ -105,11 +107,6 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) { } func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { - return cache.assumePod(pod, time.Now()) -} - -// assumePod exists for making test deterministic by taking time as input argument. -func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error { key, err := getPodKey(pod) if err != nil { return err @@ -122,16 +119,38 @@ func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error { } cache.addPod(pod) - dl := now.Add(cache.ttl) ps := &podState{ - pod: pod, - deadline: &dl, + pod: pod, } cache.podStates[key] = ps cache.assumedPods[key] = true return nil } +func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { + return cache.finishBinding(pod, time.Now()) +} + +// finishBinding exists to make tests determinitistic by injecting now as an argument +func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { + key, err := getPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + glog.V(5).Infof("Finished binding for pod %v. Can be expired.", key) + currState, ok := cache.podStates[key] + if ok && cache.assumedPods[key] { + dl := now.Add(cache.ttl) + currState.bindingFinished = true + currState.deadline = &dl + } + return nil +} + func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { key, err := getPodKey(pod) if err != nil { @@ -343,6 +362,11 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { if !ok { panic("Key found in assumed set but not in podStates. Potentially a logical error.") } + if !ps.bindingFinished { + glog.Warningf("Couldn't expire cache for pod %v/%v. Binding is still in progress.", + ps.pod.Namespace, ps.pod.Name) + continue + } if now.After(*ps.deadline) { glog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name) if err := cache.expirePod(key, ps); err != nil { diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index b5a632cbbd7..67c3e5a4684 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -123,6 +123,13 @@ type testExpirePodStruct struct { assumedTime time.Time } +func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error { + if err := cache.AssumePod(pod); err != nil { + return err + } + return cache.finishBinding(pod, assumedTime) +} + // TestExpirePod tests that assumed pods will be removed if expired. // The removal will be reflected in node info. func TestExpirePod(t *testing.T) { @@ -168,7 +175,7 @@ func TestExpirePod(t *testing.T) { cache := newSchedulerCache(ttl, time.Second, nil) for _, pod := range tt.pods { - if err := cache.assumePod(pod.pod, pod.assumedTime); err != nil { + if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -215,7 +222,7 @@ func TestAddPodWillConfirm(t *testing.T) { for i, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { - if err := cache.assumePod(podToAssume, now); err != nil { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -259,7 +266,7 @@ func TestAddPodAfterExpiration(t *testing.T) { now := time.Now() for i, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) - if err := cache.assumePod(tt.pod, now); err != nil { + if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { t.Fatalf("assumePod failed: %v", err) } cache.cleanupAssumedPods(now.Add(2 * ttl)) @@ -388,7 +395,7 @@ func TestExpireAddUpdatePod(t *testing.T) { for _, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { - if err := cache.assumePod(podToAssume, now); err != nil { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -471,7 +478,7 @@ func TestForgetPod(t *testing.T) { for i, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) for _, pod := range tt.pods { - if err := cache.assumePod(pod, now); err != nil { + if err := assumeAndFinishBinding(cache, pod, now); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -565,7 +572,7 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10) pod := makeBasePod(nodeName, objName, "0", "0", nil) - err := cache.assumePod(pod, assumedTime) + err := assumeAndFinishBinding(cache, pod, assumedTime) if err != nil { b.Fatalf("assumePod failed: %v", err) } diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index 1ca64fa800b..1d216f679bd 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -61,6 +61,9 @@ type Cache interface { // After expiration, its information would be subtracted. AssumePod(pod *v1.Pod) error + // FinishBinding signals that cache for assumed pod can be expired + FinishBinding(pod *v1.Pod) error + // ForgetPod removes an assumed pod from cache. ForgetPod(pod *v1.Pod) error diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go index fb0d0c6d53b..aab879dec7d 100644 --- a/plugin/pkg/scheduler/testing/fake_cache.go +++ b/plugin/pkg/scheduler/testing/fake_cache.go @@ -32,6 +32,8 @@ func (f *FakeCache) AssumePod(pod *v1.Pod) error { return nil } +func (f *FakeCache) FinishBinding(pod *v1.Pod) error { return nil } + func (f *FakeCache) ForgetPod(pod *v1.Pod) error { return nil } func (f *FakeCache) AddPod(pod *v1.Pod) error { return nil }