From 36effb4700faa19da803e4d4adacb64ae7621458 Mon Sep 17 00:00:00 2001 From: xiuqiao Date: Mon, 8 Apr 2019 23:41:10 +0800 Subject: [PATCH] Remove FIFO scheduling queue and old pod backoff logic --- pkg/scheduler/factory/BUILD | 2 +- pkg/scheduler/factory/factory.go | 20 +-- pkg/scheduler/factory/factory_test.go | 142 ++++++++++++++---- pkg/scheduler/internal/queue/pod_backoff.go | 14 -- .../internal/queue/pod_backoff_test.go | 15 -- .../internal/queue/scheduling_queue.go | 115 +------------- .../internal/queue/scheduling_queue_test.go | 5 - 7 files changed, 120 insertions(+), 193 deletions(-) diff --git a/pkg/scheduler/factory/BUILD b/pkg/scheduler/factory/BUILD index 7f136b4c82c..905dba73bc8 100644 --- a/pkg/scheduler/factory/BUILD +++ b/pkg/scheduler/factory/BUILD @@ -21,7 +21,6 @@ go_library( "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/plugins:go_default_library", "//pkg/scheduler/plugins/v1alpha1:go_default_library", - "//pkg/scheduler/util:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -68,6 +67,7 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 6ba88d44e4f..0c49942230a 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -56,7 +56,6 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/plugins" pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1" - "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -456,7 +455,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, c.percentageOfNodesToScore, ) - podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second) return &Config{ SchedulerCache: c.schedulerCache, // The scheduler only needs to consider schedulable nodes. @@ -470,7 +468,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced) }, NextPod: internalqueue.MakeNextPodFunc(c.podQueue), - Error: MakeDefaultErrorFunc(c.client, podBackoff, c.podQueue, c.schedulerCache, c.StopEverything), + Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything), StopEverything: c.StopEverything, VolumeBinder: c.volumeBinder, SchedulingQueue: c.podQueue, @@ -639,7 +637,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core } // MakeDefaultErrorFunc construct a function to handle pod scheduler error -func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) { +func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == core.ErrNoNodesAvailable { klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) @@ -662,7 +660,6 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod } } - backoff.CleanupPodsCompletesBackingoff() podSchedulingCycle := podQueue.SchedulingCycle() // Retry asynchronously. // Note that this is extremely rudimentary and we need a more real error handling path. @@ -673,16 +670,9 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod Name: pod.Name, } - // When pod priority is enabled, we would like to place an unschedulable - // pod in the unschedulable queue. This ensures that if the pod is nominated - // to run on a node, scheduler takes the pod into account when running - // predicates for the node. - if !util.PodPriorityEnabled() { - if !backoff.TryBackoffAndWait(podID, stopEverything) { - klog.Warningf("Request for pod %v already in flight, abandoning", podID) - return - } - } + // An unschedulable pod will be placed in the unschedulable queue. + // This ensures that if the pod is nominated to run on a node, + // scheduler takes the pod into account when running predicates for the node. // Get the pod again; it may have changed/been scheduled already. getBackoff := initialGetBackoff for { diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 05feed1c7aa..7e039536994 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" @@ -252,48 +253,125 @@ func TestDefaultErrorFunc(t *testing.T) { client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}) stopCh := make(chan struct{}) defer close(stopCh) - queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} - schedulerCache := internalcache.New(30*time.Second, stopCh) - podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second) - errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh) + timestamp := time.Now() + queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp)) + schedulerCache := internalcache.New(30*time.Second, stopCh) + errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh) + + // Trigger error handling again to put the pod in unschedulable queue errFunc(testPod, nil) - for { - // This is a terrible way to do this but I plan on replacing this - // whole error handling system in the future. The test will time - // out if something doesn't work. - time.Sleep(10 * time.Millisecond) - got, exists, _ := queue.Get(testPod) - if !exists { + // Try up to a minute to retrieve the error pod from priority queue + foundPodFlag := false + maxIterations := 10 * 60 + for i := 0; i < maxIterations; i++ { + time.Sleep(100 * time.Millisecond) + got := getPodfromPriorityQueue(queue, testPod) + if got == nil { continue } - requestReceived := false - actions := client.Actions() - for _, a := range actions { - if a.GetVerb() == "get" { - getAction, ok := a.(clienttesting.GetAction) - if !ok { - t.Errorf("Can't cast action object to GetAction interface") - break - } - name := getAction.GetName() - ns := a.GetNamespace() - if name != "foo" || ns != "bar" { - t.Errorf("Expected name %s namespace %s, got %s %s", - "foo", "bar", name, ns) - } - requestReceived = true - } - } - if !requestReceived { - t.Errorf("Get pod request not received") - } + + testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name) + if e, a := testPod, got; !reflect.DeepEqual(e, a) { t.Errorf("Expected %v, got %v", e, a) } + + foundPodFlag = true break } + + if !foundPodFlag { + t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod) + } + + // Remove the pod from priority queue to test putting error + // pod in backoff queue. + queue.Delete(testPod) + + // Trigger a move request + queue.MoveAllToActiveQueue() + + // Trigger error handling again to put the pod in backoff queue + errFunc(testPod, nil) + + foundPodFlag = false + for i := 0; i < maxIterations; i++ { + time.Sleep(100 * time.Millisecond) + // The pod should be found from backoff queue at this time + got := getPodfromPriorityQueue(queue, testPod) + if got == nil { + continue + } + + testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name) + + if e, a := testPod, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + + foundPodFlag = true + break + } + + if !foundPodFlag { + t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod) + } +} + +// getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get +// the specific pod from the given priority queue. It returns the found pod in the priority queue. +func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod { + podList := queue.PendingPods() + if len(podList) == 0 { + return nil + } + + queryPodKey, err := cache.MetaNamespaceKeyFunc(pod) + if err != nil { + return nil + } + + for _, foundPod := range podList { + foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod) + if err != nil { + return nil + } + + if foundPodKey == queryPodKey { + return foundPod + } + } + + return nil +} + +// testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test. +// It tests whether the fake client can receive request and correctly "get" the namespace +// and name of the error pod. +func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) { + requestReceived := false + actions := client.Actions() + for _, a := range actions { + if a.GetVerb() == "get" { + getAction, ok := a.(clienttesting.GetAction) + if !ok { + t.Errorf("Can't cast action object to GetAction interface") + break + } + name := getAction.GetName() + ns := a.GetNamespace() + if name != podName || ns != podNs { + t.Errorf("Expected name %s namespace %s, got %s %s", + podName, podNs, name, ns) + } + requestReceived = true + } + } + if !requestReceived { + t.Errorf("Get pod request not received") + } } func TestBind(t *testing.T) { diff --git a/pkg/scheduler/internal/queue/pod_backoff.go b/pkg/scheduler/internal/queue/pod_backoff.go index de332802f04..da89815a377 100644 --- a/pkg/scheduler/internal/queue/pod_backoff.go +++ b/pkg/scheduler/internal/queue/pod_backoff.go @@ -60,20 +60,6 @@ func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time return backoffTime, true } -// TryBackoffAndWait tries to perform backoff for a non-preempting pod. -// it is invoked from factory.go if util.PodPriorityEnabled() returns false. -func (pbm *PodBackoffMap) TryBackoffAndWait(nsPod ktypes.NamespacedName, stop <-chan struct{}) bool { - pbm.lock.RLock() - defer pbm.lock.RUnlock() - backoffDuration := pbm.calculateBackoffDuration(nsPod) - select { - case <-time.After(backoffDuration): - return true - case <-stop: - return false - } -} - // calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration { diff --git a/pkg/scheduler/internal/queue/pod_backoff_test.go b/pkg/scheduler/internal/queue/pod_backoff_test.go index c8d98898165..0f20d6cd921 100644 --- a/pkg/scheduler/internal/queue/pod_backoff_test.go +++ b/pkg/scheduler/internal/queue/pod_backoff_test.go @@ -92,18 +92,3 @@ func TestClearPodBackoff(t *testing.T) { t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String()) } } - -func TestTryBackoffAndWait(t *testing.T) { - bpm := NewPodBackoffMap(1*time.Second, 60*time.Second) - - stopCh := make(chan struct{}) - podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"} - if !bpm.TryBackoffAndWait(podID, stopCh) { - t.Error("Expected TryBackoffAndWait success for new pod, got failure.") - } - - close(stopCh) - if bpm.TryBackoffAndWait(podID, stopCh) { - t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.") - } -} diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index 6bc9bf74bd5..41a41c15a4c 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -15,14 +15,11 @@ limitations under the License. */ // This file contains structures that implement scheduling queue types. -// Scheduling queues hold pods waiting to be scheduled. This file has two types -// of scheduling queue: 1) a FIFO, which is mostly the same as cache.FIFO, 2) a +// Scheduling queues hold pods waiting to be scheduled. This file implements a // priority queue which has two sub queues. One sub-queue holds pods that are // being considered for scheduling. This is called activeQ. Another queue holds // pods that are already tried and are determined to be unschedulable. The latter // is called unschedulableQ. -// FIFO is here for flag-gating purposes and allows us to use the traditional -// scheduling queue when util.PodPriorityEnabled() returns false. package queue @@ -89,113 +86,9 @@ type SchedulingQueue interface { NumUnschedulablePods() int } -// NewSchedulingQueue initializes a new scheduling queue. If pod priority is -// enabled a priority queue is returned. If it is disabled, a FIFO is returned. +// NewSchedulingQueue initializes a priority queue as a new scheduling queue. func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue { - if util.PodPriorityEnabled() { - return NewPriorityQueue(stop) - } - return NewFIFO() -} - -// FIFO is basically a simple wrapper around cache.FIFO to make it compatible -// with the SchedulingQueue interface. -type FIFO struct { - *cache.FIFO -} - -var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue. - -// Add adds a pod to the FIFO. -func (f *FIFO) Add(pod *v1.Pod) error { - return f.FIFO.Add(pod) -} - -// AddIfNotPresent adds a pod to the FIFO if it is absent in the FIFO. -func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error { - return f.FIFO.AddIfNotPresent(pod) -} - -// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In -// FIFO it is added to the end of the queue. -func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error { - return f.FIFO.AddIfNotPresent(pod) -} - -// SchedulingCycle implements SchedulingQueue.SchedulingCycle interface. -func (f *FIFO) SchedulingCycle() int64 { - return 0 -} - -// Update updates a pod in the FIFO. -func (f *FIFO) Update(oldPod, newPod *v1.Pod) error { - return f.FIFO.Update(newPod) -} - -// Delete deletes a pod in the FIFO. -func (f *FIFO) Delete(pod *v1.Pod) error { - return f.FIFO.Delete(pod) -} - -// Pop removes the head of FIFO and returns it. -// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler -// has always been using. There is a comment in that file saying that this method -// shouldn't be used in production code, but scheduler has always been using it. -// This function does minimal error checking. -func (f *FIFO) Pop() (*v1.Pod, error) { - result, err := f.FIFO.Pop(func(obj interface{}) error { return nil }) - if err == cache.FIFOClosedError { - return nil, fmt.Errorf(queueClosed) - } - return result.(*v1.Pod), err -} - -// PendingPods returns all the pods in the queue. -func (f *FIFO) PendingPods() []*v1.Pod { - result := []*v1.Pod{} - for _, pod := range f.FIFO.List() { - result = append(result, pod.(*v1.Pod)) - } - return result -} - -// FIFO does not need to react to events, as all pods are always in the active -// scheduling queue anyway. - -// AssignedPodAdded does nothing here. -func (f *FIFO) AssignedPodAdded(pod *v1.Pod) {} - -// AssignedPodUpdated does nothing here. -func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {} - -// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue. -func (f *FIFO) MoveAllToActiveQueue() {} - -// NominatedPodsForNode returns pods that are nominated to run on the given node, -// but FIFO does not support it. -func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod { - return nil -} - -// Close closes the FIFO queue. -func (f *FIFO) Close() { - f.FIFO.Close() -} - -// DeleteNominatedPodIfExists does nothing in FIFO. -func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {} - -// UpdateNominatedPodForNode does nothing in FIFO. -func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {} - -// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. -func (f *FIFO) NumUnschedulablePods() int { - return 0 -} - -// NewFIFO creates a FIFO object. -func NewFIFO() *FIFO { - return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} + return NewPriorityQueue(stop) } // NominatedNodeName returns nominated node name of a Pod. @@ -203,7 +96,7 @@ func NominatedNodeName(pod *v1.Pod) string { return pod.Status.NominatedNodeName } -// PriorityQueue implements a scheduling queue. It is an alternative to FIFO. +// PriorityQueue implements a scheduling queue. // The head of PriorityQueue is the highest priority pending pod. This structure // has three sub queues. One sub-queue holds pods that are being considered for // scheduling. This is called activeQ and is a Heap. Another queue holds diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index d9ba6790eaf..4407ca65e89 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -688,11 +688,6 @@ func TestSchedulingQueue_Close(t *testing.T) { q SchedulingQueue expectedErr error }{ - { - name: "FIFO close", - q: NewFIFO(), - expectedErr: fmt.Errorf(queueClosed), - }, { name: "PriorityQueue close", q: NewPriorityQueue(nil),