Merge pull request #81151 from mrkm4ntr/remove-node-lister

Remove algorithm.NodeLister from scheduler interface
This commit is contained in:
Kubernetes Prow Robot 2019-08-13 12:29:55 -07:00 committed by GitHub
commit f3c94c9c42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 22 additions and 44 deletions

View File

@ -24,7 +24,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -37,21 +36,15 @@ import (
// InterPodAffinity contains information to calculate inter pod affinity.
type InterPodAffinity struct {
info predicates.NodeInfo
nodeLister algorithm.NodeLister
podLister algorithm.PodLister
hardPodAffinityWeight int32
}
// NewInterPodAffinityPriority creates an InterPodAffinity.
func NewInterPodAffinityPriority(
info predicates.NodeInfo,
nodeLister algorithm.NodeLister,
podLister algorithm.PodLister,
hardPodAffinityWeight int32) PriorityFunction {
interPodAffinity := &InterPodAffinity{
info: info,
nodeLister: nodeLister,
podLister: podLister,
hardPodAffinityWeight: hardPodAffinityWeight,
}
return interPodAffinity.CalculateInterPodAffinityPriority

View File

@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
)
type FakeNodeListInfo []*v1.Node
@ -513,8 +512,6 @@ func TestInterPodAffinityPriority(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
interPodAffinity := InterPodAffinity{
info: FakeNodeListInfo(test.nodes),
nodeLister: schedulertesting.FakeNodeLister(test.nodes),
podLister: schedulertesting.FakePodLister(test.pods),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes)
@ -603,8 +600,6 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
ipa := InterPodAffinity{
info: FakeNodeListInfo(test.nodes),
nodeLister: schedulertesting.FakeNodeLister(test.nodes),
podLister: schedulertesting.FakePodLister(test.pods),
hardPodAffinityWeight: test.hardPodAffinityWeight,
}
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes)

View File

@ -70,7 +70,7 @@ func init() {
priorities.InterPodAffinityPriority,
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) priorities.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},

View File

@ -553,7 +553,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
schedulerapi.DefaultPercentageOfNodesToScore,
false)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
result, err := scheduler.Schedule(podIgnored, framework.NewPluginContext())
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)

View File

@ -132,12 +132,12 @@ func (f *FitError) Error() string {
// onto machines.
// TODO: Rename this type.
type ScheduleAlgorithm interface {
Schedule(*v1.Pod, algorithm.NodeLister, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
Schedule(*v1.Pod, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
// Preempt receives scheduling errors for a pod and tries to create room for
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(*v1.Pod, algorithm.NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Preempt(*v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Predicates() returns a pointer to a map of predicate functions. This is
// exposed for testing.
Predicates() map[string]predicates.FitPredicate
@ -186,7 +186,7 @@ func (g *genericScheduler) snapshot() error {
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
@ -211,7 +211,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
trace.Step("Basic checks done")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod, nodeLister)
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
if err != nil {
return result, err
}
@ -317,7 +317,7 @@ func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (g *genericScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
@ -328,7 +328,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes := nodeLister.ListNodes()
allNodes := g.cache.ListNodes()
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
@ -461,13 +461,13 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
filteredNodesStatuses := framework.NodeToStatusMap{}
if len(g.predicates) == 0 {
filtered = nodeLister.ListNodes()
filtered = g.cache.ListNodes()
} else {
allNodes := int32(g.cache.NodeTree().NumNodes())
numNodesToFind := g.numFeasibleNodesToFind(allNodes)

View File

@ -652,7 +652,7 @@ func TestGenericScheduler(t *testing.T) {
false,
schedulerapi.DefaultPercentageOfNodesToScore,
false)
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
result, err := scheduler.Schedule(test.pod, framework.NewPluginContext())
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
}
@ -694,7 +694,7 @@ func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes))
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{})
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -724,7 +724,7 @@ func TestFindFitSomeError(t *testing.T) {
scheduler := makeScheduler(predicates, nodes)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes))
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -1936,7 +1936,7 @@ func TestPreempt(t *testing.T) {
if test.failedPredMap != nil {
failedPredMap = test.failedPredMap
}
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err := scheduler.Preempt(test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
@ -1966,7 +1966,7 @@ func TestPreempt(t *testing.T) {
test.pod.Status.NominatedNodeName = node.Name
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, _, err = scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
node, victims, _, err = scheduler.Preempt(test.pod, error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}

View File

@ -80,13 +80,10 @@ type PodConditionUpdater interface {
// Config is an implementation of the Scheduler's configured input data.
// TODO over time we should make this struct a hidden implementation detail of the scheduler.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache internalcache.Cache
NodeLister algorithm.NodeLister
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
@ -140,8 +137,6 @@ type PodPreemptor interface {
// construct a new scheduler.
type Configurator struct {
client clientset.Interface
// a means to list all known scheduled pods.
scheduledPodLister corelisters.PodLister
// a means to list all PersistentVolumes
pVLister corelisters.PersistentVolumeLister
// a means to list all PersistentVolumeClaims
@ -429,9 +424,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
)
return &Config{
SchedulerCache: c.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: c.schedulerCache,
SchedulerCache: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},

View File

@ -284,7 +284,7 @@ func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason s
// schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister, pluginContext)
result, err := sched.config.Algorithm.Schedule(pod, pluginContext)
if err != nil {
pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
@ -303,7 +303,7 @@ func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, sche
return "", err
}
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err

View File

@ -153,7 +153,7 @@ type mockScheduler struct {
err error
}
func (es mockScheduler) Schedule(pod *v1.Pod, ml algorithm.NodeLister, pc *framework.PluginContext) (core.ScheduleResult, error) {
func (es mockScheduler) Schedule(pod *v1.Pod, pc *framework.PluginContext) (core.ScheduleResult, error) {
return es.result, es.err
}
@ -164,7 +164,7 @@ func (es mockScheduler) Prioritizers() []priorities.PriorityConfig {
return nil
}
func (es mockScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (es mockScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
return nil, nil, nil, nil
}
@ -285,7 +285,6 @@ func TestScheduler(t *testing.T) {
s := NewFromConfig(&factory.Config{
SchedulerCache: sCache,
NodeLister: sCache,
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
@ -666,7 +665,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
config := &factory.Config{
SchedulerCache: scache,
NodeLister: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
@ -719,7 +717,6 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
sched := NewFromConfig(&factory.Config{
SchedulerCache: scache,
NodeLister: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {