diff --git a/plugin/pkg/scheduler/BUILD b/plugin/pkg/scheduler/BUILD index b780c1e47ea..0166a4c6f20 100644 --- a/plugin/pkg/scheduler/BUILD +++ b/plugin/pkg/scheduler/BUILD @@ -53,7 +53,6 @@ go_library( "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/core/BUILD b/plugin/pkg/scheduler/core/BUILD index a6c32d9f866..305ce865879 100644 --- a/plugin/pkg/scheduler/core/BUILD +++ b/plugin/pkg/scheduler/core/BUILD @@ -39,6 +39,7 @@ go_library( "equivalence_cache.go", "extender.go", "generic_scheduler.go", + "scheduling_queue.go", ], importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/core", deps = [ @@ -56,6 +57,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/core/scheduling_queue.go b/plugin/pkg/scheduler/core/scheduling_queue.go new file mode 100644 index 00000000000..969a4360071 --- /dev/null +++ b/plugin/pkg/scheduler/core/scheduling_queue.go @@ -0,0 +1,61 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This file contains structures that implement scheduling queue types. +// Scheduling queues hold pending pods waiting to be scheduled. + +package core + +import ( + "k8s.io/client-go/tools/cache" +) + +// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. +// The interface follows a pattern similar to cache.FIFO and cache.Heap and +// makes it easy to use those data structures as a SchedulingQueue. +type SchedulingQueue interface { + Add(obj interface{}) error + AddIfNotPresent(obj interface{}) error + Pop() (interface{}, error) + Update(obj interface{}) error + Delete(obj interface{}) error + List() []interface{} + ListKeys() []string + Get(obj interface{}) (item interface{}, exists bool, err error) + GetByKey(key string) (item interface{}, exists bool, err error) +} + +// FIFO is only used to add a Pop() method to cache.FIFO so that it can be +// used as a SchedulingQueue interface. +type FIFO struct { + *cache.FIFO +} + +// Pop removes the head of FIFO and returns it. +// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler +// has always been using. There is a comment in that file saying that this method +// shouldn't be used in production code, but scheduler has always been using it. +// This function does minimal error checking. +func (f *FIFO) Pop() (interface{}, error) { + var result interface{} + f.FIFO.Pop(func(obj interface{}) error { + result = obj + return nil + }) + return result, nil +} + +var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue. diff --git a/plugin/pkg/scheduler/factory/BUILD b/plugin/pkg/scheduler/factory/BUILD index a3b970ef2fa..f17409bd11e 100644 --- a/plugin/pkg/scheduler/factory/BUILD +++ b/plugin/pkg/scheduler/factory/BUILD @@ -65,6 +65,7 @@ go_test( "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api/latest:go_default_library", + "//plugin/pkg/scheduler/core:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/testing:go_default_library", "//plugin/pkg/scheduler/util:go_default_library", diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index b83010eda54..67610deb521 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -77,7 +77,7 @@ var ( type configFactory struct { client clientset.Interface // queue for pods that need scheduling - podQueue *cache.FIFO + podQueue core.SchedulingQueue // a means to list all known scheduled pods. scheduledPodLister corelisters.PodLister // a means to list all known scheduled pods and pods assumed to have been scheduled. @@ -142,10 +142,11 @@ func NewConfigFactory( stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) + schedulingQueue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} c := &configFactory{ client: client, podLister: schedulerCache, - podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), + podQueue: schedulingQueue, pVLister: pvInformer.Lister(), pVCLister: pvcInformer.Lister(), serviceLister: serviceInformer.Lister(), @@ -901,9 +902,14 @@ func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { } func (f *configFactory) getNextPod() *v1.Pod { - pod := cache.Pop(f.podQueue).(*v1.Pod) - glog.V(4).Infof("About to try and schedule pod %v", pod.Name) - return pod + if obj, err := f.podQueue.Pop(); err == nil { + pod := obj.(*v1.Pod) + glog.V(4).Infof("About to try and schedule pod %v", pod.Name) + return pod + } else { + glog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) + return nil + } } // unassignedNonTerminatedPod selects pods that are unassigned and non-terminal. @@ -1011,7 +1017,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, sche } } -func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { +func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == core.ErrNoNodesAvailable { glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 2769fa27b9f..66b87a686da 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" + "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing" "k8s.io/kubernetes/plugin/pkg/scheduler/util" @@ -280,7 +281,7 @@ func TestDefaultErrorFunc(t *testing.T) { v1.DefaultHardPodAffinitySymmetricWeight, enableEquivalenceCache, ) - queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + queue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)} podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second) errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue) diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 33f60241c12..fcc6eec039f 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -26,7 +26,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" @@ -80,7 +79,7 @@ type Configurator interface { GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) GetHardPodAffinitySymmetricWeight() int GetSchedulerName() string - MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) + MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) // Needs to be exposed for things like integration tests where we want to make fake nodes. GetNodeLister() corelisters.NodeLister diff --git a/plugin/pkg/scheduler/testutil.go b/plugin/pkg/scheduler/testutil.go index d4bd356aff6..b5a01e99b38 100644 --- a/plugin/pkg/scheduler/testutil.go +++ b/plugin/pkg/scheduler/testutil.go @@ -23,9 +23,9 @@ import ( "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/util" ) @@ -65,7 +65,7 @@ func (fc *FakeConfigurator) GetSchedulerName() string { } // MakeDefaultErrorFunc is not implemented yet. -func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { +func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) { return nil }