Remove Framework dependency on nodeinfo snapshot

This commit is contained in:
Abdullah Gharaibeh 2019-11-05 21:25:07 -05:00
parent a89265b441
commit 6b4bd87ba3
19 changed files with 45 additions and 44 deletions

View File

@ -87,6 +87,7 @@ go_test(
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/events/v1beta1:go_default_library",

View File

@ -554,6 +554,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
predicates.EmptyPredicateMetadataProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
emptyFramework,
extenders,
nil,

View File

@ -1306,6 +1306,7 @@ func NewGenericScheduler(
predicateMetaProducer predicates.PredicateMetadataProducer,
prioritizers []priorities.PriorityConfig,
priorityMetaProducer priorities.PriorityMetadataProducer,
nodeInfoSnapshot *nodeinfosnapshot.Snapshot,
framework framework.Framework,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
@ -1324,7 +1325,7 @@ func NewGenericScheduler(
priorityMetaProducer: priorityMetaProducer,
framework: framework,
extenders: extenders,
nodeInfoSnapshot: framework.NodeInfoSnapshot(),
nodeInfoSnapshot: nodeInfoSnapshot,
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,

View File

@ -141,9 +141,10 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, sharedLister scheduler
return nil
}
// EmptyPluginRegistry is a test plugin set used by the default scheduler.
var EmptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []schedulerapi.PluginConfig{})
// emptyPluginRegistry is a test plugin set used by the default scheduler.
var emptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, []schedulerapi.PluginConfig{})
var emptySnapshot = nodeinfosnapshot.NewEmptySnapshot()
// FakeFilterPlugin is a test filter plugin used by default scheduler.
type FakeFilterPlugin struct {
@ -671,6 +672,7 @@ func TestGenericScheduler(t *testing.T) {
predMetaProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
filterFramework,
[]algorithm.SchedulerExtender{},
nil,
@ -711,6 +713,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
algorithmpredicates.EmptyPredicateMetadataProducer,
prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
emptyFramework,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore, false)
@ -830,6 +833,7 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
algorithmpredicates.EmptyPredicateMetadataProducer,
prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
emptyFramework,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore, false).(*genericScheduler)
@ -1016,6 +1020,7 @@ func TestZeroRequest(t *testing.T) {
nil,
priorityConfigs,
metaDataProducer,
emptySnapshot,
emptyFramework,
[]algorithm.SchedulerExtender{},
nil,
@ -1416,6 +1421,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
algorithmpredicates.GetPredicateMetadata,
nil,
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
filterFramework,
[]algorithm.SchedulerExtender{},
nil,
@ -1672,13 +1678,13 @@ func TestPickOneNodeForPreemption(t *testing.T) {
nodes = append(nodes, makeNode(n, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5))
}
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes)
fwk, _ := framework.NewFramework(EmptyPluginRegistry, nil, []schedulerapi.PluginConfig{}, framework.WithNodeInfoSnapshot(snapshot))
fwk, _ := framework.NewFramework(emptyPluginRegistry, nil, []schedulerapi.PluginConfig{}, framework.WithSnapshotSharedLister(snapshot))
g := &genericScheduler{
framework: fwk,
nodeInfoSnapshot: snapshot,
predicates: test.predicates,
predicateMetaProducer: algorithmpredicates.GetPredicateMetadata,
nodeInfoSnapshot: snapshot,
}
assignDefaultStartTime(test.pods)
@ -2161,6 +2167,7 @@ func TestPreempt(t *testing.T) {
predMetaProducer,
[]priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
emptyFramework,
extenders,
nil,

View File

@ -428,7 +428,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
pluginConfig,
framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory),
framework.WithNodeInfoSnapshot(c.nodeInfoSnapshot),
framework.WithSnapshotSharedLister(c.nodeInfoSnapshot),
)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
@ -462,6 +462,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
predicateMetaProducer,
priorityConfigs,
priorityMetaProducer,
c.nodeInfoSnapshot,
framework,
extenders,
c.volumeBinder,

View File

@ -343,7 +343,7 @@ func TestDefaultPodTopologySpreadScore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nodes := makeNodeList(test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
mapFunction, reduceFunction := priorities.NewSelectorSpreadPriority(
fakelisters.ServiceLister(test.services),
@ -600,7 +600,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
nodes := makeLabeledNodeList(labeledNodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
mapFunction, reduceFunction := priorities.NewSelectorSpreadPriority(
fakelisters.ServiceLister(test.services),

View File

@ -206,7 +206,7 @@ func TestImageLocalityPriority(t *testing.T) {
state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
p, _ := New(nil, fh)
var gotList framework.NodeScoreList

View File

@ -850,7 +850,7 @@ func TestNodeAffinityPriority(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
p, _ := New(nil, fh)
var gotList framework.NodeScoreList
for _, n := range test.nodes {

View File

@ -227,7 +227,7 @@ func TestNodeLabelScore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
node := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: map[string]string{"foo": "", "bar": ""}}}
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, []*v1.Node{node})))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(nodeinfosnapshot.NewSnapshot(nil, []*v1.Node{node})))
args := &runtime.Unknown{Raw: []byte(test.rawArgs)}
p, err := New(args, fh)
if err != nil {

View File

@ -143,7 +143,7 @@ func TestNodePreferAvoidPods(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(nodeinfosnapshot.NewSnapshot(nil, test.nodes)))
p, _ := New(nil, fh)
var gotList framework.NodeScoreList
for _, n := range test.nodes {

View File

@ -388,7 +388,7 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
info.TransientInfo.TransNodeInfo.RequestedVolumes = len(test.pod.Spec.Volumes)
}
}
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
p, _ := NewBalancedAllocation(nil, fh)
for i := range test.nodes {

View File

@ -234,7 +234,7 @@ func TestNodeResourcesLeastAllocated(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
p, _ := NewLeastAllocated(nil, fh)
for i := range test.nodes {
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)

View File

@ -197,7 +197,7 @@ func TestNodeResourcesMostAllocated(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
p, _ := NewMostAllocated(nil, fh)
for i := range test.nodes {
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)

View File

@ -66,7 +66,7 @@ func TestRequestedToCapacityRatio(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
snapshot := nodeinfosnapshot.NewSnapshot(test.scheduledPods, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
args := &runtime.Unknown{Raw: []byte(`{"FunctionShape" : [{"Utilization" : 0, "Score" : 100}, {"Utilization" : 100, "Score" : 0}], "ResourceToWeightMap" : {"memory" : 1, "cpu" : 1}}`)}
p, _ := New(args, fh)

View File

@ -231,7 +231,7 @@ func TestTaintTolerationScore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
state := framework.NewCycleState()
snapshot := nodeinfosnapshot.NewSnapshot(nil, test.nodes)
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithNodeInfoSnapshot(snapshot))
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
p, _ := New(nil, fh)
var gotList framework.NodeScoreList

View File

@ -16,7 +16,6 @@ go_library(
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -34,7 +34,6 @@ import (
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -59,7 +58,7 @@ const (
// plugins.
type framework struct {
registry Registry
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
snapshotSharedLister schedulerlisters.SharedLister
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
@ -106,9 +105,9 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint
}
type frameworkOptions struct {
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
snapshotSharedLister schedulerlisters.SharedLister
}
// Option for the framework.
@ -128,16 +127,14 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
}
}
// WithNodeInfoSnapshot sets the NodeInfo Snapshot.
func WithNodeInfoSnapshot(nodeInfoSnapshot *nodeinfosnapshot.Snapshot) Option {
// WithSnapshotSharedLister sets the SharedLister of the snapshot.
func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister) Option {
return func(o *frameworkOptions) {
o.nodeInfoSnapshot = nodeInfoSnapshot
o.snapshotSharedLister = snapshotSharedLister
}
}
var defaultFrameworkOptions = frameworkOptions{
nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(),
}
var defaultFrameworkOptions = frameworkOptions{}
var _ Framework = &framework{}
@ -150,7 +147,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
f := &framework{
registry: r,
nodeInfoSnapshot: options.nodeInfoSnapshot,
snapshotSharedLister: options.snapshotSharedLister,
pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
@ -609,12 +606,7 @@ func (f *framework) RunPermitPlugins(
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
// remains unchanged after "Reserve".
func (f *framework) SnapshotSharedLister() schedulerlisters.SharedLister {
return f.nodeInfoSnapshot
}
// NodeInfoSnapshot returns the NodeInfo Snapshot handler.
func (f *framework) NodeInfoSnapshot() *nodeinfosnapshot.Snapshot {
return f.nodeInfoSnapshot
return f.snapshotSharedLister
}
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.

View File

@ -31,7 +31,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
// NodeScoreList declares a list of nodes and their scores.
@ -447,9 +446,6 @@ type Framework interface {
// ListPlugins returns a map of extension point name to list of configured Plugins.
ListPlugins() map[string][]config.Plugin
// NodeInfoSnapshot return the NodeInfo.Snapshot handler.
NodeInfoSnapshot() *nodeinfosnapshot.Snapshot
}
// FrameworkHandle provides data and some tools that plugins can use. It is

View File

@ -54,6 +54,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -62,6 +63,7 @@ var (
// emptyFramework is an empty framework used in tests.
// Note: If the test runs in goroutine, please don't use this variable to avoid a race condition.
emptyFramework, _ = framework.NewFramework(emptyPluginRegistry, nil, nil)
emptySnapshot = nodeinfosnapshot.NewEmptySnapshot()
)
type fakeBinder struct {
@ -649,6 +651,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyPriorityMetadataProducer,
emptySnapshot,
emptyFramework,
[]algorithm.SchedulerExtender{},
nil,
@ -692,7 +695,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]predicates.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
fwk, _ := framework.NewFramework(emptyPluginRegistry, nil, []schedulerapi.PluginConfig{})
algo := core.NewGenericScheduler(
scache,
internalqueue.NewSchedulingQueue(nil, nil),
@ -700,7 +702,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
predicates.EmptyPredicateMetadataProducer,
[]priorities.PriorityConfig{},
priorities.EmptyPriorityMetadataProducer,
fwk,
emptySnapshot,
emptyFramework,
[]algorithm.SchedulerExtender{},
nil,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
@ -735,7 +738,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{},
StopEverything: stop,
Framework: fwk,
Framework: emptyFramework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
}