From 151649df4ce2b63108a7f021d8e200b1c9dd299d Mon Sep 17 00:00:00 2001 From: goodluckbot Date: Sat, 6 Apr 2019 00:15:00 +0800 Subject: [PATCH] Integrate backoff mechanism into the scheduling queue and remove the Backoff util --- pkg/scheduler/factory/BUILD | 1 - pkg/scheduler/factory/factory.go | 6 +- pkg/scheduler/factory/factory_test.go | 3 +- pkg/scheduler/internal/queue/BUILD | 10 +- pkg/scheduler/internal/queue/pod_backoff.go | 126 ++++++++++ .../internal/queue/pod_backoff_test.go | 109 +++++++++ .../internal/queue/scheduling_queue.go | 6 +- pkg/scheduler/testutil.go | 3 +- pkg/scheduler/util/BUILD | 4 - pkg/scheduler/util/backoff_utils.go | 220 ------------------ pkg/scheduler/util/backoff_utils_test.go | 138 ----------- 11 files changed, 251 insertions(+), 375 deletions(-) create mode 100644 pkg/scheduler/internal/queue/pod_backoff.go create mode 100644 pkg/scheduler/internal/queue/pod_backoff_test.go delete mode 100644 pkg/scheduler/util/backoff_utils.go delete mode 100644 pkg/scheduler/util/backoff_utils_test.go diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 486ff1fdf4d..7f136b4c82c 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -65,7 +65,6 @@ go_test( "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", - "//pkg/scheduler/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index f1280650b14..cc41286cbff 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -455,7 +455,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, c.percentageOfNodesToScore, ) - podBackoff := util.CreateDefaultPodBackoff() + podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second) return &Config{ SchedulerCache: c.schedulerCache, // The scheduler only needs to consider schedulable nodes. @@ -638,7 +638,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core } // MakeDefaultErrorFunc construct a function to handle pod scheduler error -func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) { +func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == core.ErrNoNodesAvailable { klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) @@ -661,7 +661,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff, } } - backoff.Gc() + backoff.CleanupPodsCompletesBackingoff() podSchedulingCycle := podQueue.SchedulingCycle() // Retry asynchronously. // Note that this is extremely rudimentary and we need a more real error handling path. diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 427895d3446..7a263d3e9e5 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -41,7 +41,6 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - "k8s.io/kubernetes/pkg/scheduler/util" ) const ( @@ -255,7 +254,7 @@ func TestDefaultErrorFunc(t *testing.T) { defer close(stopCh) queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} schedulerCache := internalcache.New(30*time.Second, stopCh) - podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second) + podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second) errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh) errFunc(testPod, nil) diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index 6a6adbffe13..41a6da838f2 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["scheduling_queue.go"], + srcs = [ + "pod_backoff.go", + "scheduling_queue.go", + ], importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue", visibility = ["//pkg/scheduler:__subpackages__"], deps = [ @@ -21,7 +24,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["scheduling_queue_test.go"], + srcs = [ + "pod_backoff_test.go", + "scheduling_queue_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/api/v1/pod:go_default_library", diff --git a/pkg/scheduler/internal/queue/pod_backoff.go b/pkg/scheduler/internal/queue/pod_backoff.go new file mode 100644 index 00000000000..964650ac081 --- /dev/null +++ b/pkg/scheduler/internal/queue/pod_backoff.go @@ -0,0 +1,126 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "sync" + "time" + + ktypes "k8s.io/apimachinery/pkg/types" +) + +// PodBackoffMap is a structure that stores backoff related information for pods +type PodBackoffMap struct { + // lock for performing actions on this PodBackoffMap + lock sync.Mutex + // initial backoff duration + initialDuration time.Duration + // maximal backoff duration + maxDuration time.Duration + // map for pod -> number of attempts for this pod + podAttempts map[ktypes.NamespacedName]int + // map for pod -> lastUpdateTime pod of this pod + podLastUpdateTime map[ktypes.NamespacedName]time.Time +} + +// NewPodBackoffMap creates a PodBackoffMap with initial duration and max duration. +func NewPodBackoffMap(initialDuration, maxDuration time.Duration) *PodBackoffMap { + return &PodBackoffMap{ + initialDuration: initialDuration, + maxDuration: maxDuration, + podAttempts: make(map[ktypes.NamespacedName]int), + podLastUpdateTime: make(map[ktypes.NamespacedName]time.Time), + } +} + +// GetBackoffTime returns the time that nsPod completes backoff +func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time, bool) { + pbm.lock.Lock() + defer pbm.lock.Unlock() + if _, found := pbm.podAttempts[nsPod]; found == false { + return time.Time{}, false + } + lastUpdateTime := pbm.podLastUpdateTime[nsPod] + backoffDuration := pbm.calculateBackoffDuration(nsPod) + backoffTime := lastUpdateTime.Add(backoffDuration) + return backoffTime, true +} + +// TryBackoffAndWait tries to perform backoff for a non-preempting pod. +// it is invoked from factory.go if util.PodPriorityEnabled() returns false. +func (pbm *PodBackoffMap) TryBackoffAndWait(nsPod ktypes.NamespacedName, stop <-chan struct{}) bool { + pbm.lock.Lock() + defer pbm.lock.Unlock() + backoffDuration := pbm.calculateBackoffDuration(nsPod) + select { + case <-time.After(backoffDuration): + return true + case <-stop: + return false + } +} + +// calculateBackoffDuration is a helper function for calculating the backoffDuration +// based on the number of attempts the pod has made. +func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration { + backoffDuration := pbm.initialDuration + if _, found := pbm.podAttempts[nsPod]; found { + for i := 1; i < pbm.podAttempts[nsPod]; i++ { + backoffDuration = backoffDuration * 2 + if backoffDuration > pbm.maxDuration { + return pbm.maxDuration + } + } + } + return backoffDuration +} + +// clearPodBackoff removes all tracking information for nsPod. +// Lock is supposed to be acquired by caller. +func (pbm *PodBackoffMap) clearPodBackoff(nsPod ktypes.NamespacedName) { + delete(pbm.podAttempts, nsPod) + delete(pbm.podLastUpdateTime, nsPod) +} + +// ClearPodBackoff is the thread safe version of clearPodBackoff +func (pbm *PodBackoffMap) ClearPodBackoff(nsPod ktypes.NamespacedName) { + pbm.lock.Lock() + pbm.clearPodBackoff(nsPod) + pbm.lock.Unlock() +} + +// CleanupPodsCompletesBackingoff execute garbage collection on the pod backoff, +// i.e, it will remove a pod from the PodBackoffMap if +// lastUpdateTime + maxBackoffDuration is before the current timestamp +func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() { + pbm.lock.Lock() + defer pbm.lock.Unlock() + for pod, value := range pbm.podLastUpdateTime { + if value.Add(pbm.maxDuration).Before(time.Now()) { + pbm.clearPodBackoff(pod) + } + } +} + +// BackoffPod updates the lastUpdateTime for an nsPod, +// and increases its numberOfAttempts by 1 +func (pbm *PodBackoffMap) BackoffPod(nsPod ktypes.NamespacedName) { + pbm.lock.Lock() + pbm.podLastUpdateTime[nsPod] = time.Now() + pbm.podAttempts[nsPod]++ + pbm.lock.Unlock() +} diff --git a/pkg/scheduler/internal/queue/pod_backoff_test.go b/pkg/scheduler/internal/queue/pod_backoff_test.go new file mode 100644 index 00000000000..c8d98898165 --- /dev/null +++ b/pkg/scheduler/internal/queue/pod_backoff_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "testing" + "time" + + ktypes "k8s.io/apimachinery/pkg/types" +) + +func TestBackoffPod(t *testing.T) { + bpm := NewPodBackoffMap(1*time.Second, 10*time.Second) + + tests := []struct { + podID ktypes.NamespacedName + expectedDuration time.Duration + advanceClock time.Duration + }{ + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 1 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 2 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 4 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 8 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 10 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 10 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"}, + expectedDuration: 1 * time.Second, + }, + } + + for _, test := range tests { + // Backoff the pod + bpm.BackoffPod(test.podID) + // Get backoff duration for the pod + duration := bpm.calculateBackoffDuration(test.podID) + + if duration != test.expectedDuration { + t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) + } + } +} + +func TestClearPodBackoff(t *testing.T) { + bpm := NewPodBackoffMap(1*time.Second, 60*time.Second) + // Clear backoff on an not existed pod + bpm.clearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "not-existed"}) + // Backoff twice for pod foo + podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"} + bpm.BackoffPod(podID) + bpm.BackoffPod(podID) + if duration := bpm.calculateBackoffDuration(podID); duration != 2*time.Second { + t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String()) + } + // Clear backoff for pod foo + bpm.clearPodBackoff(podID) + // Backoff once for pod foo + bpm.BackoffPod(podID) + if duration := bpm.calculateBackoffDuration(podID); duration != 1*time.Second { + t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String()) + } +} + +func TestTryBackoffAndWait(t *testing.T) { + bpm := NewPodBackoffMap(1*time.Second, 60*time.Second) + + stopCh := make(chan struct{}) + podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"} + if !bpm.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait success for new pod, got failure.") + } + + close(stopCh) + if bpm.TryBackoffAndWait(podID, stopCh) { + t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.") + } +} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 3267bed242b..a661fe980b6 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -214,7 +214,7 @@ type PriorityQueue struct { stop <-chan struct{} clock util.Clock // podBackoff tracks backoff for pods attempting to be rescheduled - podBackoff *util.PodBackoff + podBackoff *PodBackoffMap lock sync.RWMutex cond sync.Cond @@ -295,7 +295,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority pq := &PriorityQueue{ clock: clock, stop: stop, - podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock), + podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second), activeQ: util.NewHeap(podInfoKeyFunc, activeQComp), unschedulableQ: newUnschedulablePodsMap(), nominatedPods: newNominatedPodMap(), @@ -396,7 +396,7 @@ func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool { // backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff // timeout otherwise it does nothing. func (p *PriorityQueue) backoffPod(pod *v1.Pod) { - p.podBackoff.Gc() + p.podBackoff.CleanupPodsCompletesBackingoff() podID := nsNameForPod(pod) boTime, found := p.podBackoff.GetBackoffTime(podID) diff --git a/pkg/scheduler/testutil.go b/pkg/scheduler/testutil.go index 86a89f6fd13..fb7ffeb826f 100644 --- a/pkg/scheduler/testutil.go +++ b/pkg/scheduler/testutil.go @@ -29,7 +29,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/factory" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" - "k8s.io/kubernetes/pkg/scheduler/util" ) // FakeConfigurator is an implementation for test. @@ -53,7 +52,7 @@ func (fc *FakeConfigurator) GetHardPodAffinitySymmetricWeight() int32 { } // MakeDefaultErrorFunc is not implemented yet. -func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) { +func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) { return nil } diff --git a/pkg/scheduler/util/BUILD b/pkg/scheduler/util/BUILD index e66070b58f4..709b581cd97 100644 --- a/pkg/scheduler/util/BUILD +++ b/pkg/scheduler/util/BUILD @@ -9,7 +9,6 @@ load( go_test( name = "go_default_test", srcs = [ - "backoff_utils_test.go", "heap_test.go", "utils_test.go", ], @@ -17,7 +16,6 @@ go_test( deps = [ "//pkg/apis/scheduling:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", ], ) @@ -25,7 +23,6 @@ go_test( go_library( name = "go_default_library", srcs = [ - "backoff_utils.go", "clock.go", "heap.go", "utils.go", @@ -37,7 +34,6 @@ go_library( "//pkg/scheduler/api:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/scheduler/util/backoff_utils.go b/pkg/scheduler/util/backoff_utils.go deleted file mode 100644 index 618f93772f9..00000000000 --- a/pkg/scheduler/util/backoff_utils.go +++ /dev/null @@ -1,220 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "fmt" - "sync" - "sync/atomic" - "time" - - ktypes "k8s.io/apimachinery/pkg/types" - - "k8s.io/klog" -) - -type clock interface { - Now() time.Time -} - -type realClock struct{} - -func (realClock) Now() time.Time { - return time.Now() -} - -// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. -// It is also not safe to copy this object. -type backoffEntry struct { - initialized bool - podName ktypes.NamespacedName - backoff time.Duration - lastUpdate time.Time - reqInFlight int32 -} - -// tryLock attempts to acquire a lock via atomic compare and swap. -// returns true if the lock was acquired, false otherwise -func (b *backoffEntry) tryLock() bool { - return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) -} - -// unlock returns the lock. panics if the lock isn't held -func (b *backoffEntry) unlock() { - if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { - panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) - } -} - -// backoffTime returns the Time when a backoffEntry completes backoff -func (b *backoffEntry) backoffTime() time.Time { - return b.lastUpdate.Add(b.backoff) -} - -// getBackoff returns the duration until this entry completes backoff -func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { - if !b.initialized { - b.initialized = true - return b.backoff - } - newDuration := b.backoff * 2 - if newDuration > maxDuration { - newDuration = maxDuration - } - b.backoff = newDuration - klog.V(4).Infof("Backing off %s", newDuration.String()) - return newDuration -} - -// PodBackoff is used to restart a pod with back-off delay. -type PodBackoff struct { - // expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd - expiryQ *Heap - lock sync.Mutex - clock clock - defaultDuration time.Duration - maxDuration time.Duration -} - -// MaxDuration returns the max time duration of the back-off. -func (p *PodBackoff) MaxDuration() time.Duration { - return p.maxDuration -} - -// CreateDefaultPodBackoff creates a default pod back-off object. -func CreateDefaultPodBackoff() *PodBackoff { - return CreatePodBackoff(1*time.Second, 60*time.Second) -} - -// CreatePodBackoff creates a pod back-off object by default duration and max duration. -func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff { - return CreatePodBackoffWithClock(defaultDuration, maxDuration, realClock{}) -} - -// CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock. -func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff { - return &PodBackoff{ - expiryQ: NewHeap(backoffEntryKeyFunc, backoffEntryCompareUpdate), - clock: clock, - defaultDuration: defaultDuration, - maxDuration: maxDuration, - } -} - -// getEntry returns the backoffEntry for a given podID -func (p *PodBackoff) getEntry(podID ktypes.NamespacedName) *backoffEntry { - entry, exists, _ := p.expiryQ.GetByKey(podID.String()) - var be *backoffEntry - if !exists { - be = &backoffEntry{ - initialized: false, - podName: podID, - backoff: p.defaultDuration, - } - p.expiryQ.Update(be) - } else { - be = entry.(*backoffEntry) - } - return be -} - -// BackoffPod updates the backoff for a podId and returns the duration until backoff completion -func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration { - p.lock.Lock() - defer p.lock.Unlock() - entry := p.getEntry(podID) - entry.lastUpdate = p.clock.Now() - p.expiryQ.Update(entry) - return entry.getBackoff(p.maxDuration) -} - -// TryBackoffAndWait tries to acquire the backoff lock -func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName, stop <-chan struct{}) bool { - p.lock.Lock() - entry := p.getEntry(podID) - - if !entry.tryLock() { - p.lock.Unlock() - return false - } - defer entry.unlock() - duration := entry.getBackoff(p.maxDuration) - p.lock.Unlock() - select { - case <-time.After(duration): - return true - case <-stop: - return false - } -} - -// Gc execute garbage collection on the pod back-off. -func (p *PodBackoff) Gc() { - p.lock.Lock() - defer p.lock.Unlock() - now := p.clock.Now() - var be *backoffEntry - for { - entry := p.expiryQ.Peek() - if entry == nil { - break - } - be = entry.(*backoffEntry) - if now.Sub(be.lastUpdate) > p.maxDuration { - p.expiryQ.Pop() - } else { - break - } - } -} - -// GetBackoffTime returns the time that podID completes backoff -func (p *PodBackoff) GetBackoffTime(podID ktypes.NamespacedName) (time.Time, bool) { - p.lock.Lock() - defer p.lock.Unlock() - rawBe, exists, _ := p.expiryQ.GetByKey(podID.String()) - if !exists { - return time.Time{}, false - } - be := rawBe.(*backoffEntry) - return be.lastUpdate.Add(be.backoff), true -} - -// ClearPodBackoff removes all tracking information for podID (clears expiry) -func (p *PodBackoff) ClearPodBackoff(podID ktypes.NamespacedName) bool { - p.lock.Lock() - defer p.lock.Unlock() - entry, exists, _ := p.expiryQ.GetByKey(podID.String()) - if exists { - err := p.expiryQ.Delete(entry) - return err == nil - } - return false -} - -// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap -func backoffEntryKeyFunc(b interface{}) (string, error) { - be := b.(*backoffEntry) - return be.podName.String(), nil -} - -// backoffEntryCompareUpdate returns true when b1's backoff time is before b2's -func backoffEntryCompareUpdate(b1, b2 interface{}) bool { - be1 := b1.(*backoffEntry) - be2 := b2.(*backoffEntry) - return be1.lastUpdate.Before(be2.lastUpdate) -} diff --git a/pkg/scheduler/util/backoff_utils_test.go b/pkg/scheduler/util/backoff_utils_test.go deleted file mode 100644 index b99c9498f58..00000000000 --- a/pkg/scheduler/util/backoff_utils_test.go +++ /dev/null @@ -1,138 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "testing" - "time" - - ktypes "k8s.io/apimachinery/pkg/types" -) - -type fakeClock struct { - t time.Time -} - -func (f *fakeClock) Now() time.Time { - return f.t -} - -func TestBackoffPod(t *testing.T) { - clock := fakeClock{} - backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) - tests := []struct { - podID ktypes.NamespacedName - expectedDuration time.Duration - advanceClock time.Duration - }{ - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 1 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 2 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 4 * time.Second, - }, - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"}, - expectedDuration: 1 * time.Second, - advanceClock: 120 * time.Second, - }, - // 'foo' should have been gc'd here. - { - podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 1 * time.Second, - }, - } - - for _, test := range tests { - duration := backoff.BackoffPod(test.podID) - if duration != test.expectedDuration { - t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID) - } - if boTime, _ := backoff.GetBackoffTime(test.podID); boTime != clock.Now().Add(test.expectedDuration) { - t.Errorf("expected GetBackoffTime %s, got %s for pod %s", test.expectedDuration.String(), boTime.String(), test.podID) - } - clock.t = clock.t.Add(test.advanceClock) - backoff.Gc() - } - fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"} - be := backoff.getEntry(fooID) - be.backoff = 60 * time.Second - duration := backoff.BackoffPod(fooID) - if duration != 60*time.Second { - t.Errorf("expected: 60, got %s", duration.String()) - } - // Verify that we split on namespaces correctly, same name, different namespace - fooID.Namespace = "other" - duration = backoff.BackoffPod(fooID) - if duration != 1*time.Second { - t.Errorf("expected: 1, got %s", duration.String()) - } -} - -func TestClearPodBackoff(t *testing.T) { - clock := fakeClock{} - backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) - - if backoff.ClearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "nonexist"}) { - t.Error("Expected ClearPodBackoff failure for unknown pod, got success.") - } - - podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"} - if dur := backoff.BackoffPod(podID); dur != 1*time.Second { - t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, dur.String()) - } - - if !backoff.ClearPodBackoff(podID) { - t.Errorf("Failed to clear backoff for pod %v", podID) - } - - expectBoTime := clock.Now() - if boTime, _ := backoff.GetBackoffTime(podID); boTime != expectBoTime { - t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime) - } -} - -func TestTryBackoffAndWait(t *testing.T) { - clock := fakeClock{} - backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) - - stopCh := make(chan struct{}) - podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"} - if !backoff.TryBackoffAndWait(podID, stopCh) { - t.Error("Expected TryBackoffAndWait success for new pod, got failure.") - } - - be := backoff.getEntry(podID) - if !be.tryLock() { - t.Error("Failed to acquire lock for backoffentry") - } - - if backoff.TryBackoffAndWait(podID, stopCh) { - t.Error("Expected TryBackoffAndWait failure with lock acquired, got success.") - } - - close(stopCh) - if backoff.TryBackoffAndWait(podID, stopCh) { - t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.") - } -}