From 530ee716e33c9b918d4efd999c799faef9dd9c62 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 8 Dec 2016 18:15:06 +0200 Subject: [PATCH] Count ttl for assumed pod when binding is finished In such cases when api server is overloaded and returns a lot of 429 (too many requests) errors - binding may take a lot of time to succeed due to retry policy implemented in rest client. In such events cache ttl for assumed pods wasn't big enough. In order to minimize probability of such errors ttl for assumed pods will be counted from the time when binding for particular pod is finished (either with error or success) Change-Id: Ib0122f8a76dc57c82f2c7c52497aad1bdd8be411 --- plugin/pkg/scheduler/scheduler.go | 3 + plugin/pkg/scheduler/scheduler_test.go | 90 +++++++++++++++++++ plugin/pkg/scheduler/schedulercache/cache.go | 40 +++++++-- .../scheduler/schedulercache/cache_test.go | 19 ++-- .../pkg/scheduler/schedulercache/interface.go | 3 + plugin/pkg/scheduler/testing/fake_cache.go | 2 + 6 files changed, 143 insertions(+), 14 deletions(-) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 6f52ba00287..aea715004c9 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -138,6 +138,9 @@ func (s *Scheduler) scheduleOne() { // If binding succeeded then PodScheduled condition will be updated in apiserver so that // it's atomic with setting host. err := s.config.Binder.Bind(b) + if err := s.config.SchedulerCache.FinishBinding(&assumed); err != nil { + glog.Errorf("scheduler cache FinishBinding failed: %v", err) + } if err != nil { glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name) if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil { diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 5f4adf01994..42b73035a48 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -297,6 +297,65 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { } } +// Scheduler should preserve predicate constraint even if binding was longer +// than cache ttl +func TestSchedulerErrorWithLongBinding(t *testing.T) { + stop := make(chan struct{}) + defer close(stop) + + firstPod := podWithPort("foo", "", 8080) + conflictPod := podWithPort("bar", "", 8080) + pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod} + for _, test := range []struct { + Expected map[string]bool + CacheTTL time.Duration + BindingDuration time.Duration + }{ + { + Expected: map[string]bool{firstPod.Name: true}, + CacheTTL: 100 * time.Millisecond, + BindingDuration: 300 * time.Millisecond, + }, + { + Expected: map[string]bool{firstPod.Name: true}, + CacheTTL: 10 * time.Second, + BindingDuration: 300 * time.Millisecond, + }, + } { + queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) + scache := schedulercache.New(test.CacheTTL, stop) + + node := v1.Node{ObjectMeta: v1.ObjectMeta{Name: "machine1"}} + scache.AddNode(&node) + + nodeLister := algorithm.FakeNodeLister([]*v1.Node{&node}) + predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} + + scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry( + queuedPodStore, scache, nodeLister, predicateMap, stop, test.BindingDuration) + scheduler.Run() + queuedPodStore.Add(firstPod) + queuedPodStore.Add(conflictPod) + + resultBindings := map[string]bool{} + waitChan := time.After(5 * time.Second) + for finished := false; !finished; { + select { + case b := <-bindingChan: + resultBindings[b.Name] = true + p := pods[b.Name] + p.Spec.NodeName = b.Target.Name + scache.AddPod(p) + case <-waitChan: + finished = true + } + } + if !reflect.DeepEqual(resultBindings, test.Expected) { + t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected) + } + } +} + // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, @@ -429,3 +488,34 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. } return New(cfg), bindingChan, errChan } + +func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { + algo := NewGenericScheduler( + scache, + predicateMap, + algorithm.EmptyMetadataProducer, + []algorithm.PriorityConfig{}, + algorithm.EmptyMetadataProducer, + []algorithm.SchedulerExtender{}) + bindingChan := make(chan *v1.Binding, 2) + cfg := &Config{ + SchedulerCache: scache, + NodeLister: nodeLister, + Algorithm: algo, + Binder: fakeBinder{func(b *v1.Binding) error { + time.Sleep(bindingTime) + bindingChan <- b + return nil + }}, + NextPod: func() *v1.Pod { + return clientcache.Pop(queuedPodStore).(*v1.Pod) + }, + Error: func(p *v1.Pod, err error) { + queuedPodStore.AddIfNotPresent(p) + }, + Recorder: &record.FakeRecorder{}, + PodConditionUpdater: fakePodConditionUpdater{}, + StopEverything: stop, + } + return New(cfg), bindingChan +} diff --git a/plugin/pkg/scheduler/schedulercache/cache.go b/plugin/pkg/scheduler/schedulercache/cache.go index 28bd1c77895..ae4f18b3b42 100644 --- a/plugin/pkg/scheduler/schedulercache/cache.go +++ b/plugin/pkg/scheduler/schedulercache/cache.go @@ -60,6 +60,8 @@ type podState struct { pod *v1.Pod // Used by assumedPod to determinate expiration. deadline *time.Time + // Used to block cache from expiring assumedPod if binding still runs + bindingFinished bool } func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { @@ -105,11 +107,6 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) { } func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { - return cache.assumePod(pod, time.Now()) -} - -// assumePod exists for making test deterministic by taking time as input argument. -func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error { key, err := getPodKey(pod) if err != nil { return err @@ -122,16 +119,38 @@ func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error { } cache.addPod(pod) - dl := now.Add(cache.ttl) ps := &podState{ - pod: pod, - deadline: &dl, + pod: pod, } cache.podStates[key] = ps cache.assumedPods[key] = true return nil } +func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { + return cache.finishBinding(pod, time.Now()) +} + +// finishBinding exists to make tests determinitistic by injecting now as an argument +func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { + key, err := getPodKey(pod) + if err != nil { + return err + } + + cache.mu.Lock() + defer cache.mu.Unlock() + + glog.V(5).Infof("Finished binding for pod %v. Can be expired.", key) + currState, ok := cache.podStates[key] + if ok && cache.assumedPods[key] { + dl := now.Add(cache.ttl) + currState.bindingFinished = true + currState.deadline = &dl + } + return nil +} + func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { key, err := getPodKey(pod) if err != nil { @@ -343,6 +362,11 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { if !ok { panic("Key found in assumed set but not in podStates. Potentially a logical error.") } + if !ps.bindingFinished { + glog.Warningf("Couldn't expire cache for pod %v/%v. Binding is still in progress.", + ps.pod.Namespace, ps.pod.Name) + continue + } if now.After(*ps.deadline) { glog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name) if err := cache.expirePod(key, ps); err != nil { diff --git a/plugin/pkg/scheduler/schedulercache/cache_test.go b/plugin/pkg/scheduler/schedulercache/cache_test.go index b5a632cbbd7..67c3e5a4684 100644 --- a/plugin/pkg/scheduler/schedulercache/cache_test.go +++ b/plugin/pkg/scheduler/schedulercache/cache_test.go @@ -123,6 +123,13 @@ type testExpirePodStruct struct { assumedTime time.Time } +func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error { + if err := cache.AssumePod(pod); err != nil { + return err + } + return cache.finishBinding(pod, assumedTime) +} + // TestExpirePod tests that assumed pods will be removed if expired. // The removal will be reflected in node info. func TestExpirePod(t *testing.T) { @@ -168,7 +175,7 @@ func TestExpirePod(t *testing.T) { cache := newSchedulerCache(ttl, time.Second, nil) for _, pod := range tt.pods { - if err := cache.assumePod(pod.pod, pod.assumedTime); err != nil { + if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -215,7 +222,7 @@ func TestAddPodWillConfirm(t *testing.T) { for i, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { - if err := cache.assumePod(podToAssume, now); err != nil { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -259,7 +266,7 @@ func TestAddPodAfterExpiration(t *testing.T) { now := time.Now() for i, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) - if err := cache.assumePod(tt.pod, now); err != nil { + if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { t.Fatalf("assumePod failed: %v", err) } cache.cleanupAssumedPods(now.Add(2 * ttl)) @@ -388,7 +395,7 @@ func TestExpireAddUpdatePod(t *testing.T) { for _, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { - if err := cache.assumePod(podToAssume, now); err != nil { + if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -471,7 +478,7 @@ func TestForgetPod(t *testing.T) { for i, tt := range tests { cache := newSchedulerCache(ttl, time.Second, nil) for _, pod := range tt.pods { - if err := cache.assumePod(pod, now); err != nil { + if err := assumeAndFinishBinding(cache, pod, now); err != nil { t.Fatalf("assumePod failed: %v", err) } } @@ -565,7 +572,7 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10) pod := makeBasePod(nodeName, objName, "0", "0", nil) - err := cache.assumePod(pod, assumedTime) + err := assumeAndFinishBinding(cache, pod, assumedTime) if err != nil { b.Fatalf("assumePod failed: %v", err) } diff --git a/plugin/pkg/scheduler/schedulercache/interface.go b/plugin/pkg/scheduler/schedulercache/interface.go index 1ca64fa800b..1d216f679bd 100644 --- a/plugin/pkg/scheduler/schedulercache/interface.go +++ b/plugin/pkg/scheduler/schedulercache/interface.go @@ -61,6 +61,9 @@ type Cache interface { // After expiration, its information would be subtracted. AssumePod(pod *v1.Pod) error + // FinishBinding signals that cache for assumed pod can be expired + FinishBinding(pod *v1.Pod) error + // ForgetPod removes an assumed pod from cache. ForgetPod(pod *v1.Pod) error diff --git a/plugin/pkg/scheduler/testing/fake_cache.go b/plugin/pkg/scheduler/testing/fake_cache.go index fb0d0c6d53b..aab879dec7d 100644 --- a/plugin/pkg/scheduler/testing/fake_cache.go +++ b/plugin/pkg/scheduler/testing/fake_cache.go @@ -32,6 +32,8 @@ func (f *FakeCache) AssumePod(pod *v1.Pod) error { return nil } +func (f *FakeCache) FinishBinding(pod *v1.Pod) error { return nil } + func (f *FakeCache) ForgetPod(pod *v1.Pod) error { return nil } func (f *FakeCache) AddPod(pod *v1.Pod) error { return nil }