Change all scheduler's Node and Pod list/lookups to be from the snapshot rather than the cache.

This commit is contained in:
Abdullah Gharaibeh 2019-10-24 18:30:21 -04:00
parent 4ee1e7510f
commit 8ec56aeafb
31 changed files with 243 additions and 101 deletions

View File

@ -28,6 +28,7 @@ go_library(
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",

View File

@ -355,7 +355,7 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
// getMeta creates predicate meta data given the list of pods.
getMeta := func(lister fakelisters.PodLister) (*predicateMetadata, map[string]*schedulernodeinfo.NodeInfo) {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(lister, test.nodes)
_, precompute := NewServiceAffinityPredicate(fakelisters.NodeLister(test.nodes), lister, fakelisters.ServiceLister(test.services), nil)
_, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(test.nodes), lister, fakelisters.ServiceLister(test.services), nil)
RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute)
meta := GetPredicateMetadata(test.pendingPod, nodeInfoMap)
return meta.(*predicateMetadata), nodeInfoMap

View File

@ -991,7 +991,7 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *v1.Pod, meta PredicateMet
// ServiceAffinity defines a struct used for creating service affinity predicates.
type ServiceAffinity struct {
nodeLister schedulerlisters.NodeLister
nodeInfoLister schedulerlisters.NodeInfoLister
podLister schedulerlisters.PodLister
serviceLister corelisters.ServiceLister
labels []string
@ -1024,9 +1024,9 @@ func (s *ServiceAffinity) serviceAffinityMetadataProducer(pm *predicateMetadata)
}
// NewServiceAffinityPredicate creates a ServiceAffinity.
func NewServiceAffinityPredicate(nodeLister schedulerlisters.NodeLister, podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, labels []string) (FitPredicate, predicateMetadataProducer) {
func NewServiceAffinityPredicate(nodeInfoLister schedulerlisters.NodeInfoLister, podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, labels []string) (FitPredicate, predicateMetadataProducer) {
affinity := &ServiceAffinity{
nodeLister: nodeLister,
nodeInfoLister: nodeInfoLister,
podLister: podLister,
serviceLister: serviceLister,
labels: labels,
@ -1084,11 +1084,11 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta PredicateMetada
if len(s.labels) > len(affinityLabels) {
if len(services) > 0 {
if len(filteredPods) > 0 {
nodeWithAffinityLabels, err := s.nodeLister.GetNodeInfo(filteredPods[0].Spec.NodeName)
nodeWithAffinityLabels, err := s.nodeInfoLister.Get(filteredPods[0].Spec.NodeName)
if err != nil {
return false, nil, err
}
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Labels))
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Node().Labels))
}
}
}
@ -1192,14 +1192,14 @@ func EssentialPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedule
// PodAffinityChecker contains information to check pod affinity.
type PodAffinityChecker struct {
nodeLister schedulerlisters.NodeLister
nodeInfoLister schedulerlisters.NodeInfoLister
podLister schedulerlisters.PodLister
}
// NewPodAffinityPredicate creates a PodAffinityChecker.
func NewPodAffinityPredicate(nodeLister schedulerlisters.NodeLister, podLister schedulerlisters.PodLister) FitPredicate {
func NewPodAffinityPredicate(nodeInfoLister schedulerlisters.NodeInfoLister, podLister schedulerlisters.PodLister) FitPredicate {
checker := &PodAffinityChecker{
nodeLister: nodeLister,
nodeInfoLister: nodeInfoLister,
podLister: podLister,
}
return checker.InterPodAffinityMatches
@ -1254,7 +1254,7 @@ func (c *PodAffinityChecker) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod,
return false, false, nil
}
// Namespace and selector of the terms have matched. Now we check topology of the terms.
targetPodNodeInfo, err := c.nodeLister.GetNodeInfo(targetPod.Spec.NodeName)
targetPodNodeInfo, err := c.nodeInfoLister.Get(targetPod.Spec.NodeName)
if err != nil {
return false, false, err
}
@ -1262,7 +1262,7 @@ func (c *PodAffinityChecker) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod,
if len(term.TopologyKey) == 0 {
return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
}
if !priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNodeInfo, term.TopologyKey) {
if !priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNodeInfo.Node(), term.TopologyKey) {
return false, true, nil
}
}
@ -1327,12 +1327,12 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1.
topologyMaps := newTopologyPairsMaps()
for _, existingPod := range existingPods {
existingPodNode, err := c.nodeLister.GetNodeInfo(existingPod.Spec.NodeName)
existingPodNodeInfo, err := c.nodeInfoLister.Get(existingPod.Spec.NodeName)
if err != nil {
klog.Errorf("Pod %s has NodeName %q but node is not found", podName(existingPod), existingPod.Spec.NodeName)
continue
}
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNode)
existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNodeInfo.Node())
if err != nil {
return nil, err
}

View File

@ -1858,7 +1858,7 @@ func TestServiceAffinity(t *testing.T) {
nodeInfo.SetNode(test.node)
nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{test.node.Name: nodeInfo}
// Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations.
predicate, precompute := NewServiceAffinityPredicate(fakelisters.NodeLister(nodes), fakelisters.PodLister(test.pods), fakelisters.ServiceLister(test.services), test.labels)
predicate, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(nodes), fakelisters.PodLister(test.pods), fakelisters.ServiceLister(test.services), test.labels)
// Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test.
RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", func(pm *predicateMetadata) {
if !skipPrecompute {
@ -2931,7 +2931,7 @@ func TestInterPodAffinity(t *testing.T) {
}
fit := PodAffinityChecker{
nodeLister: fakelisters.NodeLister([]*v1.Node{node}),
nodeInfoLister: fakelisters.NewNodeInfoLister([]*v1.Node{node}),
podLister: fakelisters.PodLister(test.pods),
}
nodeInfo := schedulernodeinfo.NewNodeInfo(podsOnNode...)
@ -4044,7 +4044,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
for indexNode, node := range test.nodes {
testFit := PodAffinityChecker{
nodeLister: fakelisters.NodeLister(test.nodes),
nodeInfoLister: fakelisters.NewNodeInfoLister(test.nodes),
podLister: fakelisters.PodLister(test.pods),
}

View File

@ -80,6 +80,7 @@ go_test(
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",

View File

@ -34,14 +34,14 @@ import (
// InterPodAffinity contains information to calculate inter pod affinity.
type InterPodAffinity struct {
nodeLister schedulerlisters.NodeLister
nodeInfoLister schedulerlisters.NodeInfoLister
hardPodAffinityWeight int32
}
// NewInterPodAffinityPriority creates an InterPodAffinity.
func NewInterPodAffinityPriority(nodeLister schedulerlisters.NodeLister, hardPodAffinityWeight int32) PriorityFunction {
func NewInterPodAffinityPriority(nodeInfoLister schedulerlisters.NodeInfoLister, hardPodAffinityWeight int32) PriorityFunction {
interPodAffinity := &InterPodAffinity{
nodeLister: nodeLister,
nodeInfoLister: nodeInfoLister,
hardPodAffinityWeight: hardPodAffinityWeight,
}
return interPodAffinity.CalculateInterPodAffinityPriority
@ -118,7 +118,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
var maxCount, minCount int64
processPod := func(existingPod *v1.Pod) error {
existingPodNode, err := ipa.nodeLister.GetNodeInfo(existingPod.Spec.NodeName)
existingPodNodeInfo, err := ipa.nodeInfoLister.Get(existingPod.Spec.NodeName)
if err != nil {
klog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
return nil
@ -126,6 +126,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
existingPodAffinity := existingPod.Spec.Affinity
existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil
existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil
existingPodNode := existingPodNodeInfo.Node()
if hasAffinityConstraints {
// For every soft pod affinity term of <pod>, if <existingPod> matches the term,

View File

@ -23,8 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
@ -515,12 +514,12 @@ func TestInterPodAffinityPriority(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
interPodAffinity := InterPodAffinity{
nodeLister: fakelisters.NodeLister(test.nodes),
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes)
list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -603,12 +602,12 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
ipa := InterPodAffinity{
nodeLister: fakelisters.NodeLister(test.nodes),
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: test.hardPodAffinityWeight,
}
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes)
list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -660,14 +659,14 @@ func BenchmarkInterPodAffinityPriority(b *testing.B) {
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
existingPods, allNodes := tt.prepFunc(tt.existingPodsNum, tt.allNodesNum)
nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes)
snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes)
interPodAffinity := InterPodAffinity{
nodeLister: fakelisters.NodeLister(allNodes),
nodeInfoLister: snapshot.NodeInfos(),
hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight,
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, nodeNameToInfo, allNodes)
interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot.NodeInfoMap, allNodes)
}
})
}

View File

@ -41,13 +41,13 @@ import (
// PluginFactoryArgs are passed to all plugin factory functions.
type PluginFactoryArgs struct {
NodeInfoLister schedulerlisters.NodeInfoLister
PodLister schedulerlisters.PodLister
ServiceLister corelisters.ServiceLister
ControllerLister corelisters.ReplicationControllerLister
ReplicaSetLister appslisters.ReplicaSetLister
StatefulSetLister appslisters.StatefulSetLister
PDBLister policylisters.PodDisruptionBudgetLister
NodeLister schedulerlisters.NodeLister
CSINodeLister v1beta1storagelisters.CSINodeLister
PVLister corelisters.PersistentVolumeLister
PVCLister corelisters.PersistentVolumeClaimLister
@ -270,7 +270,7 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
if policy.Argument.ServiceAffinity != nil {
predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate {
predicate, precomputationFunction := predicates.NewServiceAffinityPredicate(
args.NodeLister,
args.NodeInfoLister,
args.PodLister,
args.ServiceLister,
policy.Argument.ServiceAffinity.Labels,

View File

@ -92,7 +92,7 @@ func init() {
scheduler.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinityPred,
func(args scheduler.PluginFactoryArgs) predicates.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeLister, args.PodLister)
return predicates.NewPodAffinityPredicate(args.NodeInfoLister, args.PodLister)
},
)

View File

@ -70,7 +70,7 @@ func init() {
priorities.InterPodAffinityPriority,
scheduler.PriorityConfigFactory{
Function: func(args scheduler.PluginFactoryArgs) priorities.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeLister, args.HardPodAffinitySymmetricWeight)
return priorities.NewInterPodAffinityPriority(args.NodeInfoLister, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},

View File

@ -1420,7 +1420,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
nodes = append(nodes, node)
}
if test.addAffinityPredicate {
n := fakelisters.NodeLister([]*v1.Node{nodes[0]})
n := fakelisters.NewNodeInfoLister([]*v1.Node{nodes[0]})
p := fakelisters.PodLister(test.pods)
test.predicates[algorithmpredicates.MatchInterPodAffinityPred] = algorithmpredicates.NewPodAffinityPredicate(n, p)
}

View File

@ -57,6 +57,7 @@ import (
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
@ -175,6 +176,7 @@ type Configurator struct {
plugins *config.Plugins
pluginConfig []config.PluginConfig
pluginConfigProducerRegistry *plugins.ConfigProducerRegistry
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
}
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
@ -259,6 +261,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator {
plugins: args.Plugins,
pluginConfig: args.PluginConfig,
pluginConfigProducerRegistry: args.PluginConfigProducerRegistry,
nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(),
}
return c
@ -407,6 +410,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
pluginConfig,
framework.WithClientSet(c.client),
framework.WithInformerFactory(c.informerFactory),
framework.WithNodeInfoSnapshot(c.nodeInfoSnapshot),
)
if err != nil {
klog.Fatalf("error initializing the scheduling framework: %v", err)
@ -582,13 +586,13 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin
func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigProducerArgs) {
return &PluginFactoryArgs{
PodLister: c.schedulerCache,
NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(),
PodLister: c.nodeInfoSnapshot.Pods(),
ServiceLister: c.serviceLister,
ControllerLister: c.controllerLister,
ReplicaSetLister: c.replicaSetLister,
StatefulSetLister: c.statefulSetLister,
PDBLister: c.pdbLister,
NodeLister: c.schedulerCache,
CSINodeLister: c.csiNodeLister,
PVLister: c.pVLister,
PVCLister: c.pVCLister,

View File

@ -24,7 +24,6 @@ go_library(
"//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library",
"//pkg/scheduler/framework/plugins/volumezone:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],

View File

@ -38,13 +38,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
// RegistryArgs arguments needed to create default plugin factories.
type RegistryArgs struct {
SchedulerCache internalcache.Cache
VolumeBinder *volumebinder.VolumeBinder
}
@ -75,9 +73,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry {
nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD,
nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk,
nodevolumelimits.CinderName: nodevolumelimits.NewCinder,
interpodaffinity.Name: func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return interpodaffinity.New(args.SchedulerCache, args.SchedulerCache), nil
},
interpodaffinity.Name: interpodaffinity.New,
}
}

View File

@ -9,9 +9,9 @@ go_library(
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
)
@ -23,8 +23,7 @@ go_test(
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers/fake:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],

View File

@ -21,10 +21,10 @@ import (
"fmt"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -54,8 +54,8 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy
}
// New initializes a new plugin and returns it.
func New(nodeLister schedulerlisters.NodeLister, podLister schedulerlisters.PodLister) framework.Plugin {
func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &InterPodAffinity{
predicate: predicates.NewPodAffinityPredicate(nodeLister, podLister),
}
predicate: predicates.NewPodAffinityPredicate(h.SnapshotSharedLister().NodeInfos(), h.SnapshotSharedLister().Pods()),
}, nil
}

View File

@ -26,8 +26,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
var (
@ -733,14 +732,15 @@ func TestSingleNode(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := test.node
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, []*v1.Node{node})
meta := predicates.GetPredicateMetadata(test.pod, nodeInfoMap)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, []*v1.Node{test.node})
meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap)
state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
p := New(fakelisters.NodeLister([]*v1.Node{node}), fakelisters.PodLister(test.pods))
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, nodeInfoMap[node.Name])
p := &InterPodAffinity{
predicate: predicates.NewPodAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods()),
}
gotStatus := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name])
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}
@ -1432,15 +1432,16 @@ func TestMultipleNodes(t *testing.T) {
for indexTest, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodeListInfo := fakelisters.NodeLister(test.nodes)
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes)
snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes)
for indexNode, node := range test.nodes {
meta := predicates.GetPredicateMetadata(test.pod, nodeInfoMap)
meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap)
state := framework.NewCycleState()
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
p := New(nodeListInfo, fakelisters.PodLister(test.pods))
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, nodeInfoMap[node.Name])
p := &InterPodAffinity{
predicate: predicates.NewPodAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods()),
}
gotStatus := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[node.Name])
if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) {
t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode])
}

View File

@ -13,6 +13,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",

View File

@ -31,6 +31,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
@ -107,6 +108,7 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint
type frameworkOptions struct {
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
nodeInfoSnapshot *nodeinfosnapshot.Snapshot
}
// Option for the framework.
@ -126,7 +128,16 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
}
}
var defaultFrameworkOptions = frameworkOptions{}
// WithNodeInfoSnapshot sets the NodeInfo Snapshot.
func WithNodeInfoSnapshot(nodeInfoSnapshot *nodeinfosnapshot.Snapshot) Option {
return func(o *frameworkOptions) {
o.nodeInfoSnapshot = nodeInfoSnapshot
}
}
var defaultFrameworkOptions = frameworkOptions{
nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(),
}
var _ Framework = &framework{}
@ -139,7 +150,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
f := &framework{
registry: r,
nodeInfoSnapshot: nodeinfosnapshot.NewSnapshot(),
nodeInfoSnapshot: options.nodeInfoSnapshot,
pluginNameToWeightMap: make(map[string]int),
waitingPods: newWaitingPodsMap(),
clientSet: options.clientSet,
@ -593,6 +604,14 @@ func (f *framework) RunPermitPlugins(
return nil
}
// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
// remains unchanged after "Reserve".
func (f *framework) SnapshotSharedLister() schedulerlisters.SharedLister {
return f.nodeInfoSnapshot
}
// NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until a
// pod finishes "Reserve". There is no guarantee that the information remains

View File

@ -29,6 +29,7 @@ import (
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
@ -452,6 +453,15 @@ type Framework interface {
// passed to the plugin factories at the time of plugin initialization. Plugins
// must store and use this handle to call framework functions.
type FrameworkHandle interface {
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Reserve" point. There is no guarantee that the information
// remains unchanged in the binding phase of scheduling, so plugins in the binding
// cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it,
// otherwise a concurrent read/write error might occur, they should use scheduler
// cache instead.
SnapshotSharedLister() schedulerlisters.SharedLister
// NodeInfoSnapshot return the latest NodeInfo snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Reserve" point. There is no guarantee that the information

View File

@ -1077,7 +1077,7 @@ func TestNodeOperators(t *testing.T) {
}
// Case 2: dump cached nodes successfully.
cachedNodes := nodeinfosnapshot.NewSnapshot()
cachedNodes := nodeinfosnapshot.NewEmptySnapshot()
cache.UpdateNodeInfoSnapshot(cachedNodes)
newNode, found := cachedNodes.NodeInfoMap[node.Name]
if !found || len(cachedNodes.NodeInfoMap) != 1 {
@ -1333,7 +1333,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil)
snapshot = nodeinfosnapshot.NewSnapshot()
snapshot = nodeinfosnapshot.NewEmptySnapshot()
for _, op := range test.operations {
op()
@ -1382,7 +1382,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
for n := 0; n < b.N; n++ {
cachedNodes := nodeinfosnapshot.NewSnapshot()
cachedNodes := nodeinfosnapshot.NewEmptySnapshot()
cache.UpdateNodeInfoSnapshot(cachedNodes)
}
}

View File

@ -102,9 +102,6 @@ type Cache interface {
// on this node.
UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error
// GetNodeInfo returns the node object with node string.
GetNodeInfo(nodeName string) (*v1.Node, error)
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot
}

View File

@ -37,6 +37,7 @@ go_test(
"//pkg/api/v1/pod:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",

View File

@ -35,6 +35,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
@ -246,6 +247,10 @@ func (*fakeFramework) SharedInformerFactory() informers.SharedInformerFactory {
return nil
}
func (*fakeFramework) SnapshotSharedLister() schedulerlisters.SharedLister {
return nil
}
func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) {
q := NewPriorityQueue(nil, &fakeFramework{})
if err := q.Add(&medPriorityPod); err != nil {

View File

@ -6,6 +6,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/listers",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],

View File

@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",

View File

@ -30,6 +30,7 @@ import (
storagelisters "k8s.io/client-go/listers/storage/v1"
v1beta1storagelisters "k8s.io/client-go/listers/storage/v1beta1"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
var _ schedulerlisters.PodLister = &PodLister{}
@ -115,7 +116,7 @@ func (f ControllerLister) GetPodControllers(pod *v1.Pod) (controllers []*v1.Repl
}
}
if len(controllers) == 0 {
err = fmt.Errorf("Could not find Replication Controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
err = fmt.Errorf("could not find Replication Controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
@ -154,7 +155,7 @@ func (f ReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) (rss []*appsv1.ReplicaS
}
}
if len(rss) == 0 {
err = fmt.Errorf("Could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
@ -192,7 +193,7 @@ func (f StatefulSetLister) GetPodStatefulSets(pod *v1.Pod) (sss []*appsv1.Statef
}
}
if len(sss) == 0 {
err = fmt.Errorf("Could not find StatefulSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
err = fmt.Errorf("could not find StatefulSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
@ -243,17 +244,34 @@ func (pvcs PersistentVolumeClaimLister) PersistentVolumeClaims(namespace string)
}
}
// NodeLister declares a *v1.Node type for testing.
type NodeLister []*v1.Node
// NodeInfoLister declares a schedulernodeinfo.NodeInfo type for testing.
type NodeInfoLister []*schedulernodeinfo.NodeInfo
// GetNodeInfo returns a fake node object in the fake nodes.
func (nodes NodeLister) GetNodeInfo(nodeName string) (*v1.Node, error) {
// Get returns a fake node object in the fake nodes.
func (nodes NodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) {
for _, node := range nodes {
if node != nil && node.Name == nodeName {
if node != nil && node.Node().Name == nodeName {
return node, nil
}
}
return nil, fmt.Errorf("Unable to find node: %s", nodeName)
return nil, fmt.Errorf("unable to find node: %s", nodeName)
}
// List lists all nodes.
func (nodes NodeInfoLister) List() ([]*schedulernodeinfo.NodeInfo, error) {
return nodes, nil
}
// NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes.
func NewNodeInfoLister(nodes []*v1.Node) schedulerlisters.NodeInfoLister {
nodeInfoList := make([]*schedulernodeinfo.NodeInfo, len(nodes))
for _, node := range nodes {
nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(node)
nodeInfoList = append(nodeInfoList, nodeInfo)
}
return NodeInfoLister(nodeInfoList)
}
var _ v1beta1storagelisters.CSINodeLister = CSINodeLister{}

View File

@ -19,6 +19,7 @@ package listers
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// PodFilter is a function to filter a pod. If pod passed return true else return false.
@ -33,8 +34,16 @@ type PodLister interface {
FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
}
// NodeLister interface represents anything that can list/get node object from node name.
type NodeLister interface {
// TODO(ahg-g): rename to Get and add a List interface.
GetNodeInfo(nodeName string) (*v1.Node, error)
// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name.
type NodeInfoLister interface {
// Returns the list of NodeInfos.
List() ([]*schedulernodeinfo.NodeInfo, error)
// Returns the NodeInfo of the given node name.
Get(nodeName string) (*schedulernodeinfo.NodeInfo, error)
}
// SharedLister groups scheduler-specific listers.
type SharedLister interface {
Pods() PodLister
NodeInfos() NodeInfoLister
}

View File

@ -6,8 +6,10 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],
)

View File

@ -17,7 +17,11 @@ limitations under the License.
package nodeinfo
import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -31,13 +35,40 @@ type Snapshot struct {
Generation int64
}
// NewSnapshot initializes a Snapshot struct and returns it.
func NewSnapshot() *Snapshot {
var _ schedulerlisters.SharedLister = &Snapshot{}
// NewEmptySnapshot initializes a Snapshot struct and returns it.
func NewEmptySnapshot() *Snapshot {
return &Snapshot{
NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
}
}
// NewSnapshot initializes a Snapshot struct and returns it.
func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(pods, nodes)
nodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, len(nodes))
for _, v := range nodeInfoMap {
nodeInfoList = append(nodeInfoList, v)
}
s := NewEmptySnapshot()
s.NodeInfoMap = nodeInfoMap
s.NodeInfoList = nodeInfoList
return s
}
// Pods returns a PodLister
func (s *Snapshot) Pods() schedulerlisters.PodLister {
return &podLister{snapshot: s}
}
// NodeInfos returns a NodeInfoLister.
func (s *Snapshot) NodeInfos() schedulerlisters.NodeInfoLister {
return &nodeInfoLister{snapshot: s}
}
// ListNodes returns the list of nodes in the snapshot.
func (s *Snapshot) ListNodes() []*v1.Node {
nodes := make([]*v1.Node, 0, len(s.NodeInfoMap))
@ -48,3 +79,50 @@ func (s *Snapshot) ListNodes() []*v1.Node {
}
return nodes
}
type podLister struct {
snapshot *Snapshot
}
// List returns the list of pods in the snapshot.
func (p *podLister) List(selector labels.Selector) ([]*v1.Pod, error) {
alwaysTrue := func(p *v1.Pod) bool { return true }
return p.FilteredList(alwaysTrue, selector)
}
// FilteredList returns a filtered list of pods in the snapshot.
func (p *podLister) FilteredList(podFilter schedulerlisters.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
// podFilter is expected to return true for most or all of the pods. We
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
maxSize := 0
for _, n := range p.snapshot.NodeInfoMap {
maxSize += len(n.Pods())
}
pods := make([]*v1.Pod, 0, maxSize)
for _, n := range p.snapshot.NodeInfoMap {
for _, pod := range n.Pods() {
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
}
}
}
return pods, nil
}
type nodeInfoLister struct {
snapshot *Snapshot
}
// List returns the list of nodes in the snapshot.
func (n *nodeInfoLister) List() ([]*schedulernodeinfo.NodeInfo, error) {
return n.snapshot.NodeInfoList, nil
}
// Returns the NodeInfo of the given node name.
func (n *nodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) {
if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok {
return v, nil
}
return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)
}

View File

@ -277,7 +277,6 @@ func New(client clientset.Interface,
registry := options.frameworkDefaultRegistry
if registry == nil {
registry = frameworkplugins.NewDefaultRegistry(&frameworkplugins.RegistryArgs{
SchedulerCache: schedulerCache,
VolumeBinder: volumeBinder,
})
}