mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
Remove FIFO scheduling queue and old pod backoff logic
This commit is contained in:
parent
36e06bbb1f
commit
36effb4700
@ -21,7 +21,6 @@ go_library(
|
|||||||
"//pkg/scheduler/internal/queue:go_default_library",
|
"//pkg/scheduler/internal/queue:go_default_library",
|
||||||
"//pkg/scheduler/plugins:go_default_library",
|
"//pkg/scheduler/plugins:go_default_library",
|
||||||
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
|
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
|
||||||
"//pkg/scheduler/util:go_default_library",
|
|
||||||
"//pkg/scheduler/volumebinder:go_default_library",
|
"//pkg/scheduler/volumebinder:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
@ -68,6 +67,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
|
@ -56,7 +56,6 @@ import (
|
|||||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/plugins"
|
"k8s.io/kubernetes/pkg/scheduler/plugins"
|
||||||
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
|
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -456,7 +455,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||||||
c.percentageOfNodesToScore,
|
c.percentageOfNodesToScore,
|
||||||
)
|
)
|
||||||
|
|
||||||
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
|
|
||||||
return &Config{
|
return &Config{
|
||||||
SchedulerCache: c.schedulerCache,
|
SchedulerCache: c.schedulerCache,
|
||||||
// The scheduler only needs to consider schedulable nodes.
|
// The scheduler only needs to consider schedulable nodes.
|
||||||
@ -470,7 +468,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||||||
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
|
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
|
||||||
},
|
},
|
||||||
NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
|
NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
|
||||||
Error: MakeDefaultErrorFunc(c.client, podBackoff, c.podQueue, c.schedulerCache, c.StopEverything),
|
Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
|
||||||
StopEverything: c.StopEverything,
|
StopEverything: c.StopEverything,
|
||||||
VolumeBinder: c.volumeBinder,
|
VolumeBinder: c.volumeBinder,
|
||||||
SchedulingQueue: c.podQueue,
|
SchedulingQueue: c.podQueue,
|
||||||
@ -639,7 +637,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
|
||||||
func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
|
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
|
||||||
return func(pod *v1.Pod, err error) {
|
return func(pod *v1.Pod, err error) {
|
||||||
if err == core.ErrNoNodesAvailable {
|
if err == core.ErrNoNodesAvailable {
|
||||||
klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
|
klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
|
||||||
@ -662,7 +660,6 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
backoff.CleanupPodsCompletesBackingoff()
|
|
||||||
podSchedulingCycle := podQueue.SchedulingCycle()
|
podSchedulingCycle := podQueue.SchedulingCycle()
|
||||||
// Retry asynchronously.
|
// Retry asynchronously.
|
||||||
// Note that this is extremely rudimentary and we need a more real error handling path.
|
// Note that this is extremely rudimentary and we need a more real error handling path.
|
||||||
@ -673,16 +670,9 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod
|
|||||||
Name: pod.Name,
|
Name: pod.Name,
|
||||||
}
|
}
|
||||||
|
|
||||||
// When pod priority is enabled, we would like to place an unschedulable
|
// An unschedulable pod will be placed in the unschedulable queue.
|
||||||
// pod in the unschedulable queue. This ensures that if the pod is nominated
|
// This ensures that if the pod is nominated to run on a node,
|
||||||
// to run on a node, scheduler takes the pod into account when running
|
// scheduler takes the pod into account when running predicates for the node.
|
||||||
// predicates for the node.
|
|
||||||
if !util.PodPriorityEnabled() {
|
|
||||||
if !backoff.TryBackoffAndWait(podID, stopEverything) {
|
|
||||||
klog.Warningf("Request for pod %v already in flight, abandoning", podID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Get the pod again; it may have changed/been scheduled already.
|
// Get the pod again; it may have changed/been scheduled already.
|
||||||
getBackoff := initialGetBackoff
|
getBackoff := initialGetBackoff
|
||||||
for {
|
for {
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
@ -252,48 +253,125 @@ func TestDefaultErrorFunc(t *testing.T) {
|
|||||||
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
|
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
defer close(stopCh)
|
defer close(stopCh)
|
||||||
queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
|
|
||||||
schedulerCache := internalcache.New(30*time.Second, stopCh)
|
|
||||||
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
|
|
||||||
errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh)
|
|
||||||
|
|
||||||
|
timestamp := time.Now()
|
||||||
|
queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp))
|
||||||
|
schedulerCache := internalcache.New(30*time.Second, stopCh)
|
||||||
|
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh)
|
||||||
|
|
||||||
|
// Trigger error handling again to put the pod in unschedulable queue
|
||||||
errFunc(testPod, nil)
|
errFunc(testPod, nil)
|
||||||
|
|
||||||
for {
|
// Try up to a minute to retrieve the error pod from priority queue
|
||||||
// This is a terrible way to do this but I plan on replacing this
|
foundPodFlag := false
|
||||||
// whole error handling system in the future. The test will time
|
maxIterations := 10 * 60
|
||||||
// out if something doesn't work.
|
for i := 0; i < maxIterations; i++ {
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
got, exists, _ := queue.Get(testPod)
|
got := getPodfromPriorityQueue(queue, testPod)
|
||||||
if !exists {
|
if got == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
requestReceived := false
|
|
||||||
actions := client.Actions()
|
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
|
||||||
for _, a := range actions {
|
|
||||||
if a.GetVerb() == "get" {
|
|
||||||
getAction, ok := a.(clienttesting.GetAction)
|
|
||||||
if !ok {
|
|
||||||
t.Errorf("Can't cast action object to GetAction interface")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
name := getAction.GetName()
|
|
||||||
ns := a.GetNamespace()
|
|
||||||
if name != "foo" || ns != "bar" {
|
|
||||||
t.Errorf("Expected name %s namespace %s, got %s %s",
|
|
||||||
"foo", "bar", name, ns)
|
|
||||||
}
|
|
||||||
requestReceived = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !requestReceived {
|
|
||||||
t.Errorf("Get pod request not received")
|
|
||||||
}
|
|
||||||
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
|
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
|
||||||
t.Errorf("Expected %v, got %v", e, a)
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
foundPodFlag = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !foundPodFlag {
|
||||||
|
t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the pod from priority queue to test putting error
|
||||||
|
// pod in backoff queue.
|
||||||
|
queue.Delete(testPod)
|
||||||
|
|
||||||
|
// Trigger a move request
|
||||||
|
queue.MoveAllToActiveQueue()
|
||||||
|
|
||||||
|
// Trigger error handling again to put the pod in backoff queue
|
||||||
|
errFunc(testPod, nil)
|
||||||
|
|
||||||
|
foundPodFlag = false
|
||||||
|
for i := 0; i < maxIterations; i++ {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
// The pod should be found from backoff queue at this time
|
||||||
|
got := getPodfromPriorityQueue(queue, testPod)
|
||||||
|
if got == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
|
||||||
|
|
||||||
|
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
foundPodFlag = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if !foundPodFlag {
|
||||||
|
t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
|
||||||
|
// the specific pod from the given priority queue. It returns the found pod in the priority queue.
|
||||||
|
func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
|
||||||
|
podList := queue.PendingPods()
|
||||||
|
if len(podList) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, foundPod := range podList {
|
||||||
|
foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if foundPodKey == queryPodKey {
|
||||||
|
return foundPod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test.
|
||||||
|
// It tests whether the fake client can receive request and correctly "get" the namespace
|
||||||
|
// and name of the error pod.
|
||||||
|
func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) {
|
||||||
|
requestReceived := false
|
||||||
|
actions := client.Actions()
|
||||||
|
for _, a := range actions {
|
||||||
|
if a.GetVerb() == "get" {
|
||||||
|
getAction, ok := a.(clienttesting.GetAction)
|
||||||
|
if !ok {
|
||||||
|
t.Errorf("Can't cast action object to GetAction interface")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
name := getAction.GetName()
|
||||||
|
ns := a.GetNamespace()
|
||||||
|
if name != podName || ns != podNs {
|
||||||
|
t.Errorf("Expected name %s namespace %s, got %s %s",
|
||||||
|
podName, podNs, name, ns)
|
||||||
|
}
|
||||||
|
requestReceived = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !requestReceived {
|
||||||
|
t.Errorf("Get pod request not received")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBind(t *testing.T) {
|
func TestBind(t *testing.T) {
|
||||||
|
@ -60,20 +60,6 @@ func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time
|
|||||||
return backoffTime, true
|
return backoffTime, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// TryBackoffAndWait tries to perform backoff for a non-preempting pod.
|
|
||||||
// it is invoked from factory.go if util.PodPriorityEnabled() returns false.
|
|
||||||
func (pbm *PodBackoffMap) TryBackoffAndWait(nsPod ktypes.NamespacedName, stop <-chan struct{}) bool {
|
|
||||||
pbm.lock.RLock()
|
|
||||||
defer pbm.lock.RUnlock()
|
|
||||||
backoffDuration := pbm.calculateBackoffDuration(nsPod)
|
|
||||||
select {
|
|
||||||
case <-time.After(backoffDuration):
|
|
||||||
return true
|
|
||||||
case <-stop:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculateBackoffDuration is a helper function for calculating the backoffDuration
|
// calculateBackoffDuration is a helper function for calculating the backoffDuration
|
||||||
// based on the number of attempts the pod has made.
|
// based on the number of attempts the pod has made.
|
||||||
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
|
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
|
||||||
|
@ -92,18 +92,3 @@ func TestClearPodBackoff(t *testing.T) {
|
|||||||
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
|
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTryBackoffAndWait(t *testing.T) {
|
|
||||||
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second)
|
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"}
|
|
||||||
if !bpm.TryBackoffAndWait(podID, stopCh) {
|
|
||||||
t.Error("Expected TryBackoffAndWait success for new pod, got failure.")
|
|
||||||
}
|
|
||||||
|
|
||||||
close(stopCh)
|
|
||||||
if bpm.TryBackoffAndWait(podID, stopCh) {
|
|
||||||
t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -15,14 +15,11 @@ limitations under the License.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
// This file contains structures that implement scheduling queue types.
|
// This file contains structures that implement scheduling queue types.
|
||||||
// Scheduling queues hold pods waiting to be scheduled. This file has two types
|
// Scheduling queues hold pods waiting to be scheduled. This file implements a
|
||||||
// of scheduling queue: 1) a FIFO, which is mostly the same as cache.FIFO, 2) a
|
|
||||||
// priority queue which has two sub queues. One sub-queue holds pods that are
|
// priority queue which has two sub queues. One sub-queue holds pods that are
|
||||||
// being considered for scheduling. This is called activeQ. Another queue holds
|
// being considered for scheduling. This is called activeQ. Another queue holds
|
||||||
// pods that are already tried and are determined to be unschedulable. The latter
|
// pods that are already tried and are determined to be unschedulable. The latter
|
||||||
// is called unschedulableQ.
|
// is called unschedulableQ.
|
||||||
// FIFO is here for flag-gating purposes and allows us to use the traditional
|
|
||||||
// scheduling queue when util.PodPriorityEnabled() returns false.
|
|
||||||
|
|
||||||
package queue
|
package queue
|
||||||
|
|
||||||
@ -89,113 +86,9 @@ type SchedulingQueue interface {
|
|||||||
NumUnschedulablePods() int
|
NumUnschedulablePods() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
|
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
|
||||||
// enabled a priority queue is returned. If it is disabled, a FIFO is returned.
|
|
||||||
func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue {
|
func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue {
|
||||||
if util.PodPriorityEnabled() {
|
return NewPriorityQueue(stop)
|
||||||
return NewPriorityQueue(stop)
|
|
||||||
}
|
|
||||||
return NewFIFO()
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIFO is basically a simple wrapper around cache.FIFO to make it compatible
|
|
||||||
// with the SchedulingQueue interface.
|
|
||||||
type FIFO struct {
|
|
||||||
*cache.FIFO
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue.
|
|
||||||
|
|
||||||
// Add adds a pod to the FIFO.
|
|
||||||
func (f *FIFO) Add(pod *v1.Pod) error {
|
|
||||||
return f.FIFO.Add(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddIfNotPresent adds a pod to the FIFO if it is absent in the FIFO.
|
|
||||||
func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error {
|
|
||||||
return f.FIFO.AddIfNotPresent(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In
|
|
||||||
// FIFO it is added to the end of the queue.
|
|
||||||
func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
|
|
||||||
return f.FIFO.AddIfNotPresent(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SchedulingCycle implements SchedulingQueue.SchedulingCycle interface.
|
|
||||||
func (f *FIFO) SchedulingCycle() int64 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update updates a pod in the FIFO.
|
|
||||||
func (f *FIFO) Update(oldPod, newPod *v1.Pod) error {
|
|
||||||
return f.FIFO.Update(newPod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete deletes a pod in the FIFO.
|
|
||||||
func (f *FIFO) Delete(pod *v1.Pod) error {
|
|
||||||
return f.FIFO.Delete(pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pop removes the head of FIFO and returns it.
|
|
||||||
// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler
|
|
||||||
// has always been using. There is a comment in that file saying that this method
|
|
||||||
// shouldn't be used in production code, but scheduler has always been using it.
|
|
||||||
// This function does minimal error checking.
|
|
||||||
func (f *FIFO) Pop() (*v1.Pod, error) {
|
|
||||||
result, err := f.FIFO.Pop(func(obj interface{}) error { return nil })
|
|
||||||
if err == cache.FIFOClosedError {
|
|
||||||
return nil, fmt.Errorf(queueClosed)
|
|
||||||
}
|
|
||||||
return result.(*v1.Pod), err
|
|
||||||
}
|
|
||||||
|
|
||||||
// PendingPods returns all the pods in the queue.
|
|
||||||
func (f *FIFO) PendingPods() []*v1.Pod {
|
|
||||||
result := []*v1.Pod{}
|
|
||||||
for _, pod := range f.FIFO.List() {
|
|
||||||
result = append(result, pod.(*v1.Pod))
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIFO does not need to react to events, as all pods are always in the active
|
|
||||||
// scheduling queue anyway.
|
|
||||||
|
|
||||||
// AssignedPodAdded does nothing here.
|
|
||||||
func (f *FIFO) AssignedPodAdded(pod *v1.Pod) {}
|
|
||||||
|
|
||||||
// AssignedPodUpdated does nothing here.
|
|
||||||
func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {}
|
|
||||||
|
|
||||||
// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.
|
|
||||||
func (f *FIFO) MoveAllToActiveQueue() {}
|
|
||||||
|
|
||||||
// NominatedPodsForNode returns pods that are nominated to run on the given node,
|
|
||||||
// but FIFO does not support it.
|
|
||||||
func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the FIFO queue.
|
|
||||||
func (f *FIFO) Close() {
|
|
||||||
f.FIFO.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteNominatedPodIfExists does nothing in FIFO.
|
|
||||||
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
|
|
||||||
|
|
||||||
// UpdateNominatedPodForNode does nothing in FIFO.
|
|
||||||
func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {}
|
|
||||||
|
|
||||||
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
|
|
||||||
func (f *FIFO) NumUnschedulablePods() int {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewFIFO creates a FIFO object.
|
|
||||||
func NewFIFO() *FIFO {
|
|
||||||
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NominatedNodeName returns nominated node name of a Pod.
|
// NominatedNodeName returns nominated node name of a Pod.
|
||||||
@ -203,7 +96,7 @@ func NominatedNodeName(pod *v1.Pod) string {
|
|||||||
return pod.Status.NominatedNodeName
|
return pod.Status.NominatedNodeName
|
||||||
}
|
}
|
||||||
|
|
||||||
// PriorityQueue implements a scheduling queue. It is an alternative to FIFO.
|
// PriorityQueue implements a scheduling queue.
|
||||||
// The head of PriorityQueue is the highest priority pending pod. This structure
|
// The head of PriorityQueue is the highest priority pending pod. This structure
|
||||||
// has three sub queues. One sub-queue holds pods that are being considered for
|
// has three sub queues. One sub-queue holds pods that are being considered for
|
||||||
// scheduling. This is called activeQ and is a Heap. Another queue holds
|
// scheduling. This is called activeQ and is a Heap. Another queue holds
|
||||||
|
@ -688,11 +688,6 @@ func TestSchedulingQueue_Close(t *testing.T) {
|
|||||||
q SchedulingQueue
|
q SchedulingQueue
|
||||||
expectedErr error
|
expectedErr error
|
||||||
}{
|
}{
|
||||||
{
|
|
||||||
name: "FIFO close",
|
|
||||||
q: NewFIFO(),
|
|
||||||
expectedErr: fmt.Errorf(queueClosed),
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "PriorityQueue close",
|
name: "PriorityQueue close",
|
||||||
q: NewPriorityQueue(nil),
|
q: NewPriorityQueue(nil),
|
||||||
|
Loading…
Reference in New Issue
Block a user