Merge pull request #54503 from bsalamat/starvation1

Automatic merge from submit-queue (batch tested with PRs 54287, 54503). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add a new interface for scheduling queue

This PR paves the way to add a different data structure (e.g., priority queue) in subsequent PRs, but it does not make any logical or behavioral changes.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
ref/ #54501
This commit is contained in:
Kubernetes Submit Queue 2017-10-25 01:34:05 -07:00 committed by GitHub
commit 7f991a3b53
8 changed files with 81 additions and 12 deletions

View File

@ -53,7 +53,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)

View File

@ -39,6 +39,7 @@ go_library(
"equivalence_cache.go",
"extender.go",
"generic_scheduler.go",
"scheduling_queue.go",
],
importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/core",
deps = [
@ -56,6 +57,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -0,0 +1,61 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file contains structures that implement scheduling queue types.
// Scheduling queues hold pending pods waiting to be scheduled.
package core
import (
"k8s.io/client-go/tools/cache"
)
// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
Add(obj interface{}) error
AddIfNotPresent(obj interface{}) error
Pop() (interface{}, error)
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
}
// FIFO is only used to add a Pop() method to cache.FIFO so that it can be
// used as a SchedulingQueue interface.
type FIFO struct {
*cache.FIFO
}
// Pop removes the head of FIFO and returns it.
// This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler
// has always been using. There is a comment in that file saying that this method
// shouldn't be used in production code, but scheduler has always been using it.
// This function does minimal error checking.
func (f *FIFO) Pop() (interface{}, error) {
var result interface{}
f.FIFO.Pop(func(obj interface{}) error {
result = obj
return nil
})
return result, nil
}
var _ = SchedulingQueue(&FIFO{}) // Making sure that FIFO implements SchedulingQueue.

View File

@ -65,6 +65,7 @@ go_test(
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/latest:go_default_library",
"//plugin/pkg/scheduler/core:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/scheduler/testing:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library",

View File

@ -77,7 +77,7 @@ var (
type configFactory struct {
client clientset.Interface
// queue for pods that need scheduling
podQueue *cache.FIFO
podQueue core.SchedulingQueue
// a means to list all known scheduled pods.
scheduledPodLister corelisters.PodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled.
@ -142,10 +142,11 @@ func NewConfigFactory(
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
schedulingQueue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
c := &configFactory{
client: client,
podLister: schedulerCache,
podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
podQueue: schedulingQueue,
pVLister: pvInformer.Lister(),
pVCLister: pvcInformer.Lister(),
serviceLister: serviceInformer.Lister(),
@ -901,9 +902,14 @@ func (f *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
}
func (f *configFactory) getNextPod() *v1.Pod {
pod := cache.Pop(f.podQueue).(*v1.Pod)
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod
if obj, err := f.podQueue.Pop(); err == nil {
pod := obj.(*v1.Pod)
glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod
} else {
glog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
return nil
}
}
// unassignedNonTerminatedPod selects pods that are unassigned and non-terminal.
@ -1011,7 +1017,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration, sche
}
}
func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
func (factory *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
return func(pod *v1.Pod, err error) {
if err == core.ErrNoNodesAvailable {
glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
@ -280,7 +281,7 @@ func TestDefaultErrorFunc(t *testing.T) {
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
queue := &core.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue)

View File

@ -26,7 +26,6 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -80,7 +79,7 @@ type Configurator interface {
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
GetHardPodAffinitySymmetricWeight() int
GetSchedulerName() string
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error)
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister

View File

@ -23,9 +23,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
)
@ -65,7 +65,7 @@ func (fc *FakeConfigurator) GetSchedulerName() string {
}
// MakeDefaultErrorFunc is not implemented yet.
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
func (fc *FakeConfigurator) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
return nil
}