mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Fix back off when scheduling cycle is delayed
Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
parent
52d7614a8c
commit
9d2786c383
@ -21,12 +21,14 @@ import (
|
||||
"time"
|
||||
|
||||
ktypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
// PodBackoffMap is a structure that stores backoff related information for pods
|
||||
type PodBackoffMap struct {
|
||||
// lock for performing actions on this PodBackoffMap
|
||||
lock sync.RWMutex
|
||||
lock sync.RWMutex
|
||||
clock util.Clock
|
||||
// initial backoff duration
|
||||
initialDuration time.Duration
|
||||
// maximal backoff duration
|
||||
@ -38,8 +40,9 @@ type PodBackoffMap struct {
|
||||
}
|
||||
|
||||
// NewPodBackoffMap creates a PodBackoffMap with initial duration and max duration.
|
||||
func NewPodBackoffMap(initialDuration, maxDuration time.Duration) *PodBackoffMap {
|
||||
func NewPodBackoffMap(initialDuration, maxDuration time.Duration, clock util.Clock) *PodBackoffMap {
|
||||
return &PodBackoffMap{
|
||||
clock: clock,
|
||||
initialDuration: initialDuration,
|
||||
maxDuration: maxDuration,
|
||||
podAttempts: make(map[ktypes.NamespacedName]int),
|
||||
@ -91,12 +94,16 @@ func (pbm *PodBackoffMap) ClearPodBackoff(nsPod ktypes.NamespacedName) {
|
||||
|
||||
// CleanupPodsCompletesBackingoff execute garbage collection on the pod backoff,
|
||||
// i.e, it will remove a pod from the PodBackoffMap if
|
||||
// lastUpdateTime + maxBackoffDuration is before the current timestamp
|
||||
// lastUpdateTime + maxDuration >> timestamp
|
||||
// We should wait longer than the maxDuration so that the pod gets a chance to
|
||||
// (1) move to the active queue and (2) get an schedule attempt.
|
||||
func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() {
|
||||
pbm.lock.Lock()
|
||||
defer pbm.lock.Unlock()
|
||||
for pod, value := range pbm.podLastUpdateTime {
|
||||
if value.Add(pbm.maxDuration).Before(time.Now()) {
|
||||
// Here we assume that maxDuration should be enough for a pod to move up the
|
||||
// active queue and get an schedule attempt.
|
||||
if value.Add(2 * pbm.maxDuration).Before(pbm.clock.Now()) {
|
||||
pbm.clearPodBackoff(pod)
|
||||
}
|
||||
}
|
||||
@ -106,7 +113,7 @@ func (pbm *PodBackoffMap) CleanupPodsCompletesBackingoff() {
|
||||
// and increases its numberOfAttempts by 1
|
||||
func (pbm *PodBackoffMap) BackoffPod(nsPod ktypes.NamespacedName) {
|
||||
pbm.lock.Lock()
|
||||
pbm.podLastUpdateTime[nsPod] = time.Now()
|
||||
pbm.podLastUpdateTime[nsPod] = pbm.clock.Now()
|
||||
pbm.podAttempts[nsPod]++
|
||||
pbm.lock.Unlock()
|
||||
}
|
||||
|
@ -17,15 +17,17 @@ limitations under the License.
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ktypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
func TestBackoffPod(t *testing.T) {
|
||||
bpm := NewPodBackoffMap(1*time.Second, 10*time.Second)
|
||||
|
||||
timestamp := time.Now()
|
||||
bpm := NewPodBackoffMap(1*time.Second, 10*time.Second, clock.NewFakeClock(timestamp))
|
||||
tests := []struct {
|
||||
podID ktypes.NamespacedName
|
||||
expectedDuration time.Duration
|
||||
@ -61,20 +63,23 @@ func TestBackoffPod(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
// Backoff the pod
|
||||
bpm.BackoffPod(test.podID)
|
||||
// Get backoff duration for the pod
|
||||
duration := bpm.calculateBackoffDuration(test.podID)
|
||||
|
||||
if duration != test.expectedDuration {
|
||||
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
|
||||
}
|
||||
for i, test := range tests {
|
||||
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
|
||||
bpm.BackoffPod(test.podID)
|
||||
backoff, ok := bpm.GetBackoffTime(test.podID)
|
||||
if !ok {
|
||||
t.Errorf("%v should be backed off", test.podID)
|
||||
}
|
||||
duration := backoff.Sub(timestamp)
|
||||
if duration != test.expectedDuration {
|
||||
t.Errorf("expected: %s, got %s for pod %s", test.expectedDuration.String(), duration.String(), test.podID)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClearPodBackoff(t *testing.T) {
|
||||
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second)
|
||||
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second, clock.NewFakeClock(time.Now()))
|
||||
// Clear backoff on an not existed pod
|
||||
bpm.clearPodBackoff(ktypes.NamespacedName{Namespace: "ns", Name: "not-existed"})
|
||||
// Backoff twice for pod foo
|
||||
|
@ -214,7 +214,7 @@ func NewPriorityQueue(
|
||||
pq := &PriorityQueue{
|
||||
clock: options.clock,
|
||||
stop: make(chan struct{}),
|
||||
podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration),
|
||||
podBackoff: NewPodBackoffMap(options.podInitialBackoffDuration, options.podMaxBackoffDuration, options.clock),
|
||||
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
|
||||
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
|
||||
nominatedPods: newNominatedPodMap(),
|
||||
|
@ -1515,3 +1515,85 @@ func createAndRunPriorityQueue(fwk framework.Framework, opts ...Option) *Priorit
|
||||
q.Run()
|
||||
return q
|
||||
}
|
||||
|
||||
func TestBackOffFlow(t *testing.T) {
|
||||
cl := clock.NewFakeClock(time.Now())
|
||||
q := NewPriorityQueue(newDefaultFramework(), WithClock(cl))
|
||||
steps := []struct {
|
||||
wantBackoff time.Duration
|
||||
}{
|
||||
{wantBackoff: time.Second},
|
||||
{wantBackoff: 2 * time.Second},
|
||||
{wantBackoff: 4 * time.Second},
|
||||
{wantBackoff: 8 * time.Second},
|
||||
{wantBackoff: 10 * time.Second},
|
||||
{wantBackoff: 10 * time.Second},
|
||||
{wantBackoff: 10 * time.Second},
|
||||
}
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-pod",
|
||||
Namespace: "test-ns",
|
||||
UID: "test-uid",
|
||||
},
|
||||
}
|
||||
podID := nsNameForPod(pod)
|
||||
if err := q.Add(pod); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i, step := range steps {
|
||||
t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) {
|
||||
timestamp := cl.Now()
|
||||
// Simulate schedule attempt.
|
||||
podInfo, err := q.Pop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if podInfo.Attempts != i+1 {
|
||||
t.Errorf("got attempts %d, want %d", podInfo.Attempts, i+1)
|
||||
}
|
||||
if err := q.AddUnschedulableIfNotPresent(podInfo, int64(i)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// An event happens.
|
||||
q.MoveAllToActiveOrBackoffQueue("deleted pod")
|
||||
|
||||
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
|
||||
t.Errorf("pod %v is not in the backoff queue", podID)
|
||||
}
|
||||
|
||||
// Check backoff duration.
|
||||
deadline, ok := q.podBackoff.GetBackoffTime(podID)
|
||||
if !ok {
|
||||
t.Errorf("didn't get backoff for pod %s", podID)
|
||||
}
|
||||
backoff := deadline.Sub(timestamp)
|
||||
if backoff != step.wantBackoff {
|
||||
t.Errorf("got backoff %s, want %s", backoff, step.wantBackoff)
|
||||
}
|
||||
|
||||
// Simulate routine that continuously flushes the backoff queue.
|
||||
cl.Step(time.Millisecond)
|
||||
q.flushBackoffQCompleted()
|
||||
// Still in backoff queue after an early flush.
|
||||
if _, ok, _ := q.podBackoffQ.Get(podInfo); !ok {
|
||||
t.Errorf("pod %v is not in the backoff queue", podID)
|
||||
}
|
||||
// Moved out of the backoff queue after timeout.
|
||||
cl.Step(backoff)
|
||||
q.flushBackoffQCompleted()
|
||||
if _, ok, _ := q.podBackoffQ.Get(podInfo); ok {
|
||||
t.Errorf("pod %v is still in the backoff queue", podID)
|
||||
}
|
||||
})
|
||||
}
|
||||
// After some time, backoff information is cleared.
|
||||
cl.Step(time.Hour)
|
||||
q.podBackoff.CleanupPodsCompletesBackingoff()
|
||||
_, ok := q.podBackoff.GetBackoffTime(podID)
|
||||
if ok {
|
||||
t.Errorf("backoff information for pod %s was not cleared", podID)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user