mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #75497 from goodluckbot/remove-backoff-util
Integrate backoff mechanism into the scheduling queue and remove the …
This commit is contained in:
commit
52ec2a0009
@ -65,7 +65,6 @@ go_test(
|
|||||||
"//pkg/scheduler/internal/cache:go_default_library",
|
"//pkg/scheduler/internal/cache:go_default_library",
|
||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||||
"//pkg/scheduler/util:go_default_library",
|
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
@ -456,7 +456,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||||||
c.percentageOfNodesToScore,
|
c.percentageOfNodesToScore,
|
||||||
)
|
)
|
||||||
|
|
||||||
podBackoff := util.CreateDefaultPodBackoff()
|
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
|
||||||
return &Config{
|
return &Config{
|
||||||
SchedulerCache: c.schedulerCache,
|
SchedulerCache: c.schedulerCache,
|
||||||
// The scheduler only needs to consider schedulable nodes.
|
// The scheduler only needs to consider schedulable nodes.
|
||||||
@ -639,7 +639,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
||||||
func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
|
func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
|
||||||
return func(pod *v1.Pod, err error) {
|
return func(pod *v1.Pod, err error) {
|
||||||
if err == core.ErrNoNodesAvailable {
|
if err == core.ErrNoNodesAvailable {
|
||||||
klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
|
klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
|
||||||
@ -662,7 +662,7 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *util.PodBackoff,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
backoff.Gc()
|
backoff.CleanupPodsCompletesBackingoff()
|
||||||
podSchedulingCycle := podQueue.SchedulingCycle()
|
podSchedulingCycle := podQueue.SchedulingCycle()
|
||||||
// Retry asynchronously.
|
// Retry asynchronously.
|
||||||
// Note that this is extremely rudimentary and we need a more real error handling path.
|
// Note that this is extremely rudimentary and we need a more real error handling path.
|
||||||
|
@ -41,7 +41,6 @@ import (
|
|||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -255,7 +254,7 @@ func TestDefaultErrorFunc(t *testing.T) {
|
|||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
|
queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
|
||||||
schedulerCache := internalcache.New(30*time.Second, stopCh)
|
schedulerCache := internalcache.New(30*time.Second, stopCh)
|
||||||
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
|
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
|
||||||
errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh)
|
errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh)
|
||||||
|
|
||||||
errFunc(testPod, nil)
|
errFunc(testPod, nil)
|
||||||
|
@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
|||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = ["scheduling_queue.go"],
|
srcs = [
|
||||||
|
"pod_backoff.go",
|
||||||
|
"scheduling_queue.go",
|
||||||
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue",
|
importpath = "k8s.io/kubernetes/pkg/scheduler/internal/queue",
|
||||||
visibility = ["//pkg/scheduler:__subpackages__"],
|
visibility = ["//pkg/scheduler:__subpackages__"],
|
||||||
deps = [
|
deps = [
|
||||||
@ -21,7 +24,10 @@ go_library(
|
|||||||
|
|
||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = ["scheduling_queue_test.go"],
|
srcs = [
|
||||||
|
"pod_backoff_test.go",
|
||||||
|
"scheduling_queue_test.go",
|
||||||
|
],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/v1/pod:go_default_library",
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
|
126
pkg/scheduler/internal/queue/pod_backoff.go
Normal file
126
pkg/scheduler/internal/queue/pod_backoff.go
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
ktypes "k8s.io/apimachinery/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PodBackoffMap is a structure that stores backoff related information for pods
|
||||||
|
type PodBackoffMap struct {
|
||||||
|
// lock for performing actions on this PodBackoffMap
|
||||||
|
lock sync.Mutex
|
||||||
|
// initial backoff duration
|
||||||
|
initialDuration time.Duration
|
||||||
|
// maximal backoff duration
|
||||||
|
maxDuration time.Duration
|
||||||
|
// map for pod -> number of attempts for this pod
|
||||||
|
podAttempts map[ktypes.NamespacedName]int
|
||||||
|
// map for pod -> lastUpdateTime pod of this pod
|
||||||
|
podLastUpdateTime map[ktypes.NamespacedName]time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPodBackoffMap creates a PodBackoffMap with initial duration and max duration.
|
||||||
|
func NewPodBackoffMap(initialDuration, maxDuration time.Duration) *PodBackoffMap {
|
||||||
|
return &PodBackoffMap{
|
||||||
|
initialDuration: initialDuration,
|
||||||
|
maxDuration: maxDuration,
|
||||||
|
podAttempts: make(map[ktypes.NamespacedName]int),
|
||||||
|
podLastUpdateTime: make(map[ktypes.NamespacedName]time.Time),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBackoffTime returns the time that nsPod completes backoff
|
||||||
|
func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time, bool) {
|
||||||
|
pbm.lock.Lock()
|
||||||
|
defer pbm.lock.Unlock()
|
||||||
|
if _, found := pbm.podAttempts[nsPod]; found == false {
|
||||||
|
return time.Time{}, false
|
||||||
|
}
|
||||||
|
lastUpdateTime := pbm.podLastUpdateTime[nsPod]
|
||||||
|
backoffDuration := pbm.calculateBackoffDuration(nsPod)
|
||||||
|
backoffTime := lastUpdateTime.Add(backoffDuration)
|
||||||
|
return backoffTime, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// TryBackoffAndWait tries to perform backoff for a non-preempting pod.
|
||||||
|
// it is invoked from factory.go if util.PodPriorityEnabled() returns false.
|
||||||
|
func (pbm *PodBackoffMap) TryBackoffAndWait(nsPod ktypes.NamespacedName, stop <-chan struct{}) bool {
|
||||||
|
pbm.lock.Lock()
|
||||||
|
defer pbm.lock.Unlock()
|
||||||
|
backoffDuration := pbm.calculateBackoffDuration(nsPod)
|
||||||
|
select {
|
||||||
|
case <-time.After(backoffDuration):
|
||||||
|
return true
|
||||||
|
case <-stop:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculateBackoffDuration is a helper function for calculating the backoffDuration
|
||||||
|
// based on the number of attempts the pod has made.
|
||||||
|
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
|
||||||
|
backoffDuration := pbm.initialDuration
|
||||||
|
if _, found := pbm.podAttempts[nsPod]; found {
|
||||||
|
for i := 1; i < pbm.podAttempts[nsPod]; i++ {
|
||||||
|
backoffDuration = backoffDuration * 2
|
||||||
|
if backoffDuration > pbm.maxDuration {
|
||||||
|
return pbm.maxDuration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return backoffDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
// clearPodBackoff removes all tracking information for nsPod.
|
||||||
|
// Lock is supposed to be acquired by caller.
|
||||||
|
func (pbm *PodBackoffMap) clearPodBackoff(nsPod ktypes.NamespacedName) {
|
||||||
|
delete(pbm.podAttempts, nsPod)
|
||||||
|
delete(pbm.podLastUpdateTime, nsPod)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClearPodBackoff is the thread safe version of clearPodBackoff
|
||||||
|
func (pbm *PodBackoffMap) ClearPodBackoff(nsPod ktypes.NamespacedName) {
|
||||||
|
pbm.lock.Lock()
|
||||||
|
pbm.clearPodBackoff(nsPod)
|
||||||
|
pbm.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// CleanupPodsCompletesBackingoff execute garbage collection on the pod backoff,
|
||||||
|
// i.e, it will remove a pod from the PodBackoffMap if
|
||||||
|
// lastUpdateTime + maxBackoffDuration is before the current timestamp
|
||||||
|
func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() {
|
||||||
|
pbm.lock.Lock()
|
||||||
|
defer pbm.lock.Unlock()
|
||||||
|
for pod, value := range pbm.podLastUpdateTime {
|
||||||
|
if value.Add(pbm.maxDuration).Before(time.Now()) {
|
||||||
|
pbm.clearPodBackoff(pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackoffPod updates the lastUpdateTime for an nsPod,
|
||||||
|
// and increases its numberOfAttempts by 1
|
||||||
|
func (pbm *PodBackoffMap) BackoffPod(nsPod ktypes.NamespacedName) {
|
||||||
|
pbm.lock.Lock()
|
||||||
|
pbm.podLastUpdateTime[nsPod] = time.Now()
|
||||||
|
pbm.podAttempts[nsPod]++
|
||||||
|
pbm.lock.Unlock()
|
||||||
|
}
|
109
pkg/scheduler/internal/queue/pod_backoff_test.go
Normal file
109
pkg/scheduler/internal/queue/pod_backoff_test.go
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
ktypes "k8s.io/apimachinery/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBackoffPod(t *testing.T) {
|
||||||
|
bpm := NewPodBackoffMap(1*time.Second, 10*time.Second)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
podID ktypes.NamespacedName
|
||||||
|
expectedDuration time.Duration
|
||||||
|
advanceClock time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
||||||
|
expectedDuration: 1 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
||||||
|
expectedDuration: 2 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
||||||
|
expectedDuration: 4 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
||||||
|
expectedDuration: 8 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
||||||
|
expectedDuration: 10 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
||||||
|
expectedDuration: 10 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"},
|
||||||
|
expectedDuration: 1 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
// Backoff the pod
|
||||||
|
bpm.BackoffPod(test.podID)
|
||||||
|
// Get backoff duration for the pod
|
||||||
|
duration := bpm.calculateBackoffDuration(test.podID)
|
||||||
|
|
||||||
|
if duration != test.expectedDuration {
|
||||||
|
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClearPodBackoff(t *testing.T) {
|
||||||
|
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second)
|
||||||
|
// Clear backoff on an not existed pod
|
||||||
|
bpm.clearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "not-existed"})
|
||||||
|
// Backoff twice for pod foo
|
||||||
|
podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"}
|
||||||
|
bpm.BackoffPod(podID)
|
||||||
|
bpm.BackoffPod(podID)
|
||||||
|
if duration := bpm.calculateBackoffDuration(podID); duration != 2*time.Second {
|
||||||
|
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
|
||||||
|
}
|
||||||
|
// Clear backoff for pod foo
|
||||||
|
bpm.clearPodBackoff(podID)
|
||||||
|
// Backoff once for pod foo
|
||||||
|
bpm.BackoffPod(podID)
|
||||||
|
if duration := bpm.calculateBackoffDuration(podID); duration != 1*time.Second {
|
||||||
|
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTryBackoffAndWait(t *testing.T) {
|
||||||
|
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second)
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"}
|
||||||
|
if !bpm.TryBackoffAndWait(podID, stopCh) {
|
||||||
|
t.Error("Expected TryBackoffAndWait success for new pod, got failure.")
|
||||||
|
}
|
||||||
|
|
||||||
|
close(stopCh)
|
||||||
|
if bpm.TryBackoffAndWait(podID, stopCh) {
|
||||||
|
t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.")
|
||||||
|
}
|
||||||
|
}
|
@ -214,7 +214,7 @@ type PriorityQueue struct {
|
|||||||
stop <-chan struct{}
|
stop <-chan struct{}
|
||||||
clock util.Clock
|
clock util.Clock
|
||||||
// podBackoff tracks backoff for pods attempting to be rescheduled
|
// podBackoff tracks backoff for pods attempting to be rescheduled
|
||||||
podBackoff *util.PodBackoff
|
podBackoff *PodBackoffMap
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
cond sync.Cond
|
cond sync.Cond
|
||||||
@ -282,7 +282,7 @@ func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *Priority
|
|||||||
pq := &PriorityQueue{
|
pq := &PriorityQueue{
|
||||||
clock: clock,
|
clock: clock,
|
||||||
stop: stop,
|
stop: stop,
|
||||||
podBackoff: util.CreatePodBackoffWithClock(1*time.Second, 10*time.Second, clock),
|
podBackoff: NewPodBackoffMap(1*time.Second, 10*time.Second),
|
||||||
activeQ: util.NewHeap(podInfoKeyFunc, activeQComp),
|
activeQ: util.NewHeap(podInfoKeyFunc, activeQComp),
|
||||||
unschedulableQ: newUnschedulablePodsMap(),
|
unschedulableQ: newUnschedulablePodsMap(),
|
||||||
nominatedPods: newNominatedPodMap(),
|
nominatedPods: newNominatedPodMap(),
|
||||||
@ -383,7 +383,7 @@ func (p *PriorityQueue) isPodBackingOff(pod *v1.Pod) bool {
|
|||||||
// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff
|
// backoffPod checks if pod is currently undergoing backoff. If it is not it updates the backoff
|
||||||
// timeout otherwise it does nothing.
|
// timeout otherwise it does nothing.
|
||||||
func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
|
func (p *PriorityQueue) backoffPod(pod *v1.Pod) {
|
||||||
p.podBackoff.Gc()
|
p.podBackoff.CleanupPodsCompletesBackingoff()
|
||||||
|
|
||||||
podID := nsNameForPod(pod)
|
podID := nsNameForPod(pod)
|
||||||
boTime, found := p.podBackoff.GetBackoffTime(podID)
|
boTime, found := p.podBackoff.GetBackoffTime(podID)
|
||||||
|
@ -29,7 +29,6 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
|
plugins "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// FakeConfigurator is an implementation for test.
|
// FakeConfigurator is an implementation for test.
|
||||||
@ -53,7 +52,7 @@ func (fc *FakeConfigurator) GetHardPodAffinitySymmetricWeight() int32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MakeDefaultErrorFunc is not implemented yet.
|
// MakeDefaultErrorFunc is not implemented yet.
|
||||||
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) {
|
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue) func(pod *v1.Pod, err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,7 +9,6 @@ load(
|
|||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = [
|
srcs = [
|
||||||
"backoff_utils_test.go",
|
|
||||||
"heap_test.go",
|
"heap_test.go",
|
||||||
"utils_test.go",
|
"utils_test.go",
|
||||||
],
|
],
|
||||||
@ -17,7 +16,6 @@ go_test(
|
|||||||
deps = [
|
deps = [
|
||||||
"//pkg/apis/scheduling:go_default_library",
|
"//pkg/apis/scheduling:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@ -25,7 +23,6 @@ go_test(
|
|||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"backoff_utils.go",
|
|
||||||
"clock.go",
|
"clock.go",
|
||||||
"heap.go",
|
"heap.go",
|
||||||
"utils.go",
|
"utils.go",
|
||||||
@ -37,7 +34,6 @@ go_library(
|
|||||||
"//pkg/scheduler/api:go_default_library",
|
"//pkg/scheduler/api:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
@ -1,220 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package util
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
ktypes "k8s.io/apimachinery/pkg/types"
|
|
||||||
|
|
||||||
"k8s.io/klog"
|
|
||||||
)
|
|
||||||
|
|
||||||
type clock interface {
|
|
||||||
Now() time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type realClock struct{}
|
|
||||||
|
|
||||||
func (realClock) Now() time.Time {
|
|
||||||
return time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time.
|
|
||||||
// It is also not safe to copy this object.
|
|
||||||
type backoffEntry struct {
|
|
||||||
initialized bool
|
|
||||||
podName ktypes.NamespacedName
|
|
||||||
backoff time.Duration
|
|
||||||
lastUpdate time.Time
|
|
||||||
reqInFlight int32
|
|
||||||
}
|
|
||||||
|
|
||||||
// tryLock attempts to acquire a lock via atomic compare and swap.
|
|
||||||
// returns true if the lock was acquired, false otherwise
|
|
||||||
func (b *backoffEntry) tryLock() bool {
|
|
||||||
return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlock returns the lock. panics if the lock isn't held
|
|
||||||
func (b *backoffEntry) unlock() {
|
|
||||||
if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) {
|
|
||||||
panic(fmt.Sprintf("unexpected state on unlocking: %+v", b))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// backoffTime returns the Time when a backoffEntry completes backoff
|
|
||||||
func (b *backoffEntry) backoffTime() time.Time {
|
|
||||||
return b.lastUpdate.Add(b.backoff)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getBackoff returns the duration until this entry completes backoff
|
|
||||||
func (b *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration {
|
|
||||||
if !b.initialized {
|
|
||||||
b.initialized = true
|
|
||||||
return b.backoff
|
|
||||||
}
|
|
||||||
newDuration := b.backoff * 2
|
|
||||||
if newDuration > maxDuration {
|
|
||||||
newDuration = maxDuration
|
|
||||||
}
|
|
||||||
b.backoff = newDuration
|
|
||||||
klog.V(4).Infof("Backing off %s", newDuration.String())
|
|
||||||
return newDuration
|
|
||||||
}
|
|
||||||
|
|
||||||
// PodBackoff is used to restart a pod with back-off delay.
|
|
||||||
type PodBackoff struct {
|
|
||||||
// expiryQ stores backoffEntry orderedy by lastUpdate until they reach maxDuration and are GC'd
|
|
||||||
expiryQ *Heap
|
|
||||||
lock sync.Mutex
|
|
||||||
clock clock
|
|
||||||
defaultDuration time.Duration
|
|
||||||
maxDuration time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// MaxDuration returns the max time duration of the back-off.
|
|
||||||
func (p *PodBackoff) MaxDuration() time.Duration {
|
|
||||||
return p.maxDuration
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateDefaultPodBackoff creates a default pod back-off object.
|
|
||||||
func CreateDefaultPodBackoff() *PodBackoff {
|
|
||||||
return CreatePodBackoff(1*time.Second, 60*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreatePodBackoff creates a pod back-off object by default duration and max duration.
|
|
||||||
func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff {
|
|
||||||
return CreatePodBackoffWithClock(defaultDuration, maxDuration, realClock{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreatePodBackoffWithClock creates a pod back-off object by default duration, max duration and clock.
|
|
||||||
func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff {
|
|
||||||
return &PodBackoff{
|
|
||||||
expiryQ: NewHeap(backoffEntryKeyFunc, backoffEntryCompareUpdate),
|
|
||||||
clock: clock,
|
|
||||||
defaultDuration: defaultDuration,
|
|
||||||
maxDuration: maxDuration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// getEntry returns the backoffEntry for a given podID
|
|
||||||
func (p *PodBackoff) getEntry(podID ktypes.NamespacedName) *backoffEntry {
|
|
||||||
entry, exists, _ := p.expiryQ.GetByKey(podID.String())
|
|
||||||
var be *backoffEntry
|
|
||||||
if !exists {
|
|
||||||
be = &backoffEntry{
|
|
||||||
initialized: false,
|
|
||||||
podName: podID,
|
|
||||||
backoff: p.defaultDuration,
|
|
||||||
}
|
|
||||||
p.expiryQ.Update(be)
|
|
||||||
} else {
|
|
||||||
be = entry.(*backoffEntry)
|
|
||||||
}
|
|
||||||
return be
|
|
||||||
}
|
|
||||||
|
|
||||||
// BackoffPod updates the backoff for a podId and returns the duration until backoff completion
|
|
||||||
func (p *PodBackoff) BackoffPod(podID ktypes.NamespacedName) time.Duration {
|
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
entry := p.getEntry(podID)
|
|
||||||
entry.lastUpdate = p.clock.Now()
|
|
||||||
p.expiryQ.Update(entry)
|
|
||||||
return entry.getBackoff(p.maxDuration)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TryBackoffAndWait tries to acquire the backoff lock
|
|
||||||
func (p *PodBackoff) TryBackoffAndWait(podID ktypes.NamespacedName, stop <-chan struct{}) bool {
|
|
||||||
p.lock.Lock()
|
|
||||||
entry := p.getEntry(podID)
|
|
||||||
|
|
||||||
if !entry.tryLock() {
|
|
||||||
p.lock.Unlock()
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
defer entry.unlock()
|
|
||||||
duration := entry.getBackoff(p.maxDuration)
|
|
||||||
p.lock.Unlock()
|
|
||||||
select {
|
|
||||||
case <-time.After(duration):
|
|
||||||
return true
|
|
||||||
case <-stop:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gc execute garbage collection on the pod back-off.
|
|
||||||
func (p *PodBackoff) Gc() {
|
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
now := p.clock.Now()
|
|
||||||
var be *backoffEntry
|
|
||||||
for {
|
|
||||||
entry := p.expiryQ.Peek()
|
|
||||||
if entry == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
be = entry.(*backoffEntry)
|
|
||||||
if now.Sub(be.lastUpdate) > p.maxDuration {
|
|
||||||
p.expiryQ.Pop()
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBackoffTime returns the time that podID completes backoff
|
|
||||||
func (p *PodBackoff) GetBackoffTime(podID ktypes.NamespacedName) (time.Time, bool) {
|
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
rawBe, exists, _ := p.expiryQ.GetByKey(podID.String())
|
|
||||||
if !exists {
|
|
||||||
return time.Time{}, false
|
|
||||||
}
|
|
||||||
be := rawBe.(*backoffEntry)
|
|
||||||
return be.lastUpdate.Add(be.backoff), true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClearPodBackoff removes all tracking information for podID (clears expiry)
|
|
||||||
func (p *PodBackoff) ClearPodBackoff(podID ktypes.NamespacedName) bool {
|
|
||||||
p.lock.Lock()
|
|
||||||
defer p.lock.Unlock()
|
|
||||||
entry, exists, _ := p.expiryQ.GetByKey(podID.String())
|
|
||||||
if exists {
|
|
||||||
err := p.expiryQ.Delete(entry)
|
|
||||||
return err == nil
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// backoffEntryKeyFunc is the keying function used for mapping a backoffEntry to string for heap
|
|
||||||
func backoffEntryKeyFunc(b interface{}) (string, error) {
|
|
||||||
be := b.(*backoffEntry)
|
|
||||||
return be.podName.String(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// backoffEntryCompareUpdate returns true when b1's backoff time is before b2's
|
|
||||||
func backoffEntryCompareUpdate(b1, b2 interface{}) bool {
|
|
||||||
be1 := b1.(*backoffEntry)
|
|
||||||
be2 := b2.(*backoffEntry)
|
|
||||||
return be1.lastUpdate.Before(be2.lastUpdate)
|
|
||||||
}
|
|
@ -1,138 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package util
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
ktypes "k8s.io/apimachinery/pkg/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type fakeClock struct {
|
|
||||||
t time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *fakeClock) Now() time.Time {
|
|
||||||
return f.t
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBackoffPod(t *testing.T) {
|
|
||||||
clock := fakeClock{}
|
|
||||||
backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock)
|
|
||||||
tests := []struct {
|
|
||||||
podID ktypes.NamespacedName
|
|
||||||
expectedDuration time.Duration
|
|
||||||
advanceClock time.Duration
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
|
||||||
expectedDuration: 1 * time.Second,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
|
||||||
expectedDuration: 2 * time.Second,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
|
||||||
expectedDuration: 4 * time.Second,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"},
|
|
||||||
expectedDuration: 1 * time.Second,
|
|
||||||
advanceClock: 120 * time.Second,
|
|
||||||
},
|
|
||||||
// 'foo' should have been gc'd here.
|
|
||||||
{
|
|
||||||
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
|
|
||||||
expectedDuration: 1 * time.Second,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
duration := backoff.BackoffPod(test.podID)
|
|
||||||
if duration != test.expectedDuration {
|
|
||||||
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
|
|
||||||
}
|
|
||||||
if boTime, _ := backoff.GetBackoffTime(test.podID); boTime != clock.Now().Add(test.expectedDuration) {
|
|
||||||
t.Errorf("expected GetBackoffTime %s, got %s for pod %s", test.expectedDuration.String(), boTime.String(), test.podID)
|
|
||||||
}
|
|
||||||
clock.t = clock.t.Add(test.advanceClock)
|
|
||||||
backoff.Gc()
|
|
||||||
}
|
|
||||||
fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"}
|
|
||||||
be := backoff.getEntry(fooID)
|
|
||||||
be.backoff = 60 * time.Second
|
|
||||||
duration := backoff.BackoffPod(fooID)
|
|
||||||
if duration != 60*time.Second {
|
|
||||||
t.Errorf("expected: 60, got %s", duration.String())
|
|
||||||
}
|
|
||||||
// Verify that we split on namespaces correctly, same name, different namespace
|
|
||||||
fooID.Namespace = "other"
|
|
||||||
duration = backoff.BackoffPod(fooID)
|
|
||||||
if duration != 1*time.Second {
|
|
||||||
t.Errorf("expected: 1, got %s", duration.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClearPodBackoff(t *testing.T) {
|
|
||||||
clock := fakeClock{}
|
|
||||||
backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock)
|
|
||||||
|
|
||||||
if backoff.ClearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "nonexist"}) {
|
|
||||||
t.Error("Expected ClearPodBackoff failure for unknown pod, got success.")
|
|
||||||
}
|
|
||||||
|
|
||||||
podID := ktypes.NamespacedName{Namespace: "ns", Name: "foo"}
|
|
||||||
if dur := backoff.BackoffPod(podID); dur != 1*time.Second {
|
|
||||||
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, dur.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
if !backoff.ClearPodBackoff(podID) {
|
|
||||||
t.Errorf("Failed to clear backoff for pod %v", podID)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectBoTime := clock.Now()
|
|
||||||
if boTime, _ := backoff.GetBackoffTime(podID); boTime != expectBoTime {
|
|
||||||
t.Errorf("Expected backoff time for pod %s of %s, got %s", podID, expectBoTime, boTime)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestTryBackoffAndWait(t *testing.T) {
|
|
||||||
clock := fakeClock{}
|
|
||||||
backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"}
|
|
||||||
if !backoff.TryBackoffAndWait(podID, stopCh) {
|
|
||||||
t.Error("Expected TryBackoffAndWait success for new pod, got failure.")
|
|
||||||
}
|
|
||||||
|
|
||||||
be := backoff.getEntry(podID)
|
|
||||||
if !be.tryLock() {
|
|
||||||
t.Error("Failed to acquire lock for backoffentry")
|
|
||||||
}
|
|
||||||
|
|
||||||
if backoff.TryBackoffAndWait(podID, stopCh) {
|
|
||||||
t.Error("Expected TryBackoffAndWait failure with lock acquired, got success.")
|
|
||||||
}
|
|
||||||
|
|
||||||
close(stopCh)
|
|
||||||
if backoff.TryBackoffAndWait(podID, stopCh) {
|
|
||||||
t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.")
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user