From 8ec56aeafbbe09ba670e0a5a84006e3392c2c27d Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Thu, 24 Oct 2019 18:30:21 -0400 Subject: [PATCH] Change all scheduler's Node and Pod list/lookups to be from the snapshot rather than the cache. --- pkg/scheduler/BUILD | 1 + .../algorithm/predicates/metadata_test.go | 2 +- .../algorithm/predicates/predicates.go | 40 ++++----- .../algorithm/predicates/predicates_test.go | 10 +-- pkg/scheduler/algorithm/priorities/BUILD | 1 + .../algorithm/priorities/interpod_affinity.go | 9 +- .../priorities/interpod_affinity_test.go | 21 +++-- pkg/scheduler/algorithm_factory.go | 4 +- .../defaults/register_predicates.go | 2 +- .../defaults/register_priorities.go | 2 +- pkg/scheduler/core/generic_scheduler_test.go | 2 +- pkg/scheduler/factory.go | 8 +- pkg/scheduler/framework/plugins/BUILD | 1 - .../framework/plugins/default_registry.go | 8 +- .../framework/plugins/interpodaffinity/BUILD | 5 +- .../interpodaffinity/interpod_affinity.go | 8 +- .../interpod_affinity_test.go | 25 +++--- pkg/scheduler/framework/v1alpha1/BUILD | 1 + pkg/scheduler/framework/v1alpha1/framework.go | 27 +++++- pkg/scheduler/framework/v1alpha1/interface.go | 10 +++ pkg/scheduler/internal/cache/cache_test.go | 6 +- pkg/scheduler/internal/cache/interface.go | 3 - pkg/scheduler/internal/queue/BUILD | 1 + .../internal/queue/scheduling_queue_test.go | 5 ++ pkg/scheduler/listers/BUILD | 1 + pkg/scheduler/listers/fake/BUILD | 1 + pkg/scheduler/listers/fake/listers.go | 36 ++++++-- pkg/scheduler/listers/listers.go | 17 +++- pkg/scheduler/nodeinfo/snapshot/BUILD | 2 + pkg/scheduler/nodeinfo/snapshot/snapshot.go | 82 ++++++++++++++++++- pkg/scheduler/scheduler.go | 3 +- 31 files changed, 243 insertions(+), 101 deletions(-) diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index e66a7993ad4..86f1857b325 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -28,6 +28,7 @@ go_library( "//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/metrics:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index 6c8eae8ece9..c703b26b994 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -355,7 +355,7 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) { // getMeta creates predicate meta data given the list of pods. getMeta := func(lister fakelisters.PodLister) (*predicateMetadata, map[string]*schedulernodeinfo.NodeInfo) { nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(lister, test.nodes) - _, precompute := NewServiceAffinityPredicate(fakelisters.NodeLister(test.nodes), lister, fakelisters.ServiceLister(test.services), nil) + _, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(test.nodes), lister, fakelisters.ServiceLister(test.services), nil) RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", precompute) meta := GetPredicateMetadata(test.pendingPod, nodeInfoMap) return meta.(*predicateMetadata), nodeInfoMap diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 3ccdfab6032..44ba22daf0f 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -991,10 +991,10 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *v1.Pod, meta PredicateMet // ServiceAffinity defines a struct used for creating service affinity predicates. type ServiceAffinity struct { - nodeLister schedulerlisters.NodeLister - podLister schedulerlisters.PodLister - serviceLister corelisters.ServiceLister - labels []string + nodeInfoLister schedulerlisters.NodeInfoLister + podLister schedulerlisters.PodLister + serviceLister corelisters.ServiceLister + labels []string } // serviceAffinityMetadataProducer should be run once by the scheduler before looping through the Predicate. It is a helper function that @@ -1024,12 +1024,12 @@ func (s *ServiceAffinity) serviceAffinityMetadataProducer(pm *predicateMetadata) } // NewServiceAffinityPredicate creates a ServiceAffinity. -func NewServiceAffinityPredicate(nodeLister schedulerlisters.NodeLister, podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, labels []string) (FitPredicate, predicateMetadataProducer) { +func NewServiceAffinityPredicate(nodeInfoLister schedulerlisters.NodeInfoLister, podLister schedulerlisters.PodLister, serviceLister corelisters.ServiceLister, labels []string) (FitPredicate, predicateMetadataProducer) { affinity := &ServiceAffinity{ - nodeLister: nodeLister, - podLister: podLister, - serviceLister: serviceLister, - labels: labels, + nodeInfoLister: nodeInfoLister, + podLister: podLister, + serviceLister: serviceLister, + labels: labels, } return affinity.checkServiceAffinity, affinity.serviceAffinityMetadataProducer } @@ -1084,11 +1084,11 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta PredicateMetada if len(s.labels) > len(affinityLabels) { if len(services) > 0 { if len(filteredPods) > 0 { - nodeWithAffinityLabels, err := s.nodeLister.GetNodeInfo(filteredPods[0].Spec.NodeName) + nodeWithAffinityLabels, err := s.nodeInfoLister.Get(filteredPods[0].Spec.NodeName) if err != nil { return false, nil, err } - AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Labels)) + AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Node().Labels)) } } } @@ -1192,15 +1192,15 @@ func EssentialPredicates(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedule // PodAffinityChecker contains information to check pod affinity. type PodAffinityChecker struct { - nodeLister schedulerlisters.NodeLister - podLister schedulerlisters.PodLister + nodeInfoLister schedulerlisters.NodeInfoLister + podLister schedulerlisters.PodLister } // NewPodAffinityPredicate creates a PodAffinityChecker. -func NewPodAffinityPredicate(nodeLister schedulerlisters.NodeLister, podLister schedulerlisters.PodLister) FitPredicate { +func NewPodAffinityPredicate(nodeInfoLister schedulerlisters.NodeInfoLister, podLister schedulerlisters.PodLister) FitPredicate { checker := &PodAffinityChecker{ - nodeLister: nodeLister, - podLister: podLister, + nodeInfoLister: nodeInfoLister, + podLister: podLister, } return checker.InterPodAffinityMatches } @@ -1254,7 +1254,7 @@ func (c *PodAffinityChecker) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod, return false, false, nil } // Namespace and selector of the terms have matched. Now we check topology of the terms. - targetPodNodeInfo, err := c.nodeLister.GetNodeInfo(targetPod.Spec.NodeName) + targetPodNodeInfo, err := c.nodeInfoLister.Get(targetPod.Spec.NodeName) if err != nil { return false, false, err } @@ -1262,7 +1262,7 @@ func (c *PodAffinityChecker) podMatchesPodAffinityTerms(pod, targetPod *v1.Pod, if len(term.TopologyKey) == 0 { return false, false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity") } - if !priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNodeInfo, term.TopologyKey) { + if !priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), targetPodNodeInfo.Node(), term.TopologyKey) { return false, true, nil } } @@ -1327,12 +1327,12 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairsOfPods(pod *v1. topologyMaps := newTopologyPairsMaps() for _, existingPod := range existingPods { - existingPodNode, err := c.nodeLister.GetNodeInfo(existingPod.Spec.NodeName) + existingPodNodeInfo, err := c.nodeInfoLister.Get(existingPod.Spec.NodeName) if err != nil { klog.Errorf("Pod %s has NodeName %q but node is not found", podName(existingPod), existingPod.Spec.NodeName) continue } - existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNode) + existingPodTopologyMaps, err := getMatchingAntiAffinityTopologyPairsOfPod(pod, existingPod, existingPodNodeInfo.Node()) if err != nil { return nil, err } diff --git a/pkg/scheduler/algorithm/predicates/predicates_test.go b/pkg/scheduler/algorithm/predicates/predicates_test.go index ce430a8e959..fa617e26f81 100644 --- a/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -1858,7 +1858,7 @@ func TestServiceAffinity(t *testing.T) { nodeInfo.SetNode(test.node) nodeInfoMap := map[string]*schedulernodeinfo.NodeInfo{test.node.Name: nodeInfo} // Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations. - predicate, precompute := NewServiceAffinityPredicate(fakelisters.NodeLister(nodes), fakelisters.PodLister(test.pods), fakelisters.ServiceLister(test.services), test.labels) + predicate, precompute := NewServiceAffinityPredicate(fakelisters.NewNodeInfoLister(nodes), fakelisters.PodLister(test.pods), fakelisters.ServiceLister(test.services), test.labels) // Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test. RegisterPredicateMetadataProducer("ServiceAffinityMetaProducer", func(pm *predicateMetadata) { if !skipPrecompute { @@ -2931,8 +2931,8 @@ func TestInterPodAffinity(t *testing.T) { } fit := PodAffinityChecker{ - nodeLister: fakelisters.NodeLister([]*v1.Node{node}), - podLister: fakelisters.PodLister(test.pods), + nodeInfoLister: fakelisters.NewNodeInfoLister([]*v1.Node{node}), + podLister: fakelisters.PodLister(test.pods), } nodeInfo := schedulernodeinfo.NewNodeInfo(podsOnNode...) nodeInfo.SetNode(test.node) @@ -4044,8 +4044,8 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { for indexNode, node := range test.nodes { testFit := PodAffinityChecker{ - nodeLister: fakelisters.NodeLister(test.nodes), - podLister: fakelisters.PodLister(test.pods), + nodeInfoLister: fakelisters.NewNodeInfoLister(test.nodes), + podLister: fakelisters.PodLister(test.pods), } var meta PredicateMetadata diff --git a/pkg/scheduler/algorithm/priorities/BUILD b/pkg/scheduler/algorithm/priorities/BUILD index 7e27af662d8..9511de44b9b 100644 --- a/pkg/scheduler/algorithm/priorities/BUILD +++ b/pkg/scheduler/algorithm/priorities/BUILD @@ -80,6 +80,7 @@ go_test( "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/listers/fake:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//pkg/scheduler/testing:go_default_library", "//pkg/util/parsers:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity.go b/pkg/scheduler/algorithm/priorities/interpod_affinity.go index e5da2f5fa45..5f94eaab466 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity.go @@ -34,14 +34,14 @@ import ( // InterPodAffinity contains information to calculate inter pod affinity. type InterPodAffinity struct { - nodeLister schedulerlisters.NodeLister + nodeInfoLister schedulerlisters.NodeInfoLister hardPodAffinityWeight int32 } // NewInterPodAffinityPriority creates an InterPodAffinity. -func NewInterPodAffinityPriority(nodeLister schedulerlisters.NodeLister, hardPodAffinityWeight int32) PriorityFunction { +func NewInterPodAffinityPriority(nodeInfoLister schedulerlisters.NodeInfoLister, hardPodAffinityWeight int32) PriorityFunction { interPodAffinity := &InterPodAffinity{ - nodeLister: nodeLister, + nodeInfoLister: nodeInfoLister, hardPodAffinityWeight: hardPodAffinityWeight, } return interPodAffinity.CalculateInterPodAffinityPriority @@ -118,7 +118,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node var maxCount, minCount int64 processPod := func(existingPod *v1.Pod) error { - existingPodNode, err := ipa.nodeLister.GetNodeInfo(existingPod.Spec.NodeName) + existingPodNodeInfo, err := ipa.nodeInfoLister.Get(existingPod.Spec.NodeName) if err != nil { klog.Errorf("Node not found, %v", existingPod.Spec.NodeName) return nil @@ -126,6 +126,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node existingPodAffinity := existingPod.Spec.Affinity existingHasAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAffinity != nil existingHasAntiAffinityConstraints := existingPodAffinity != nil && existingPodAffinity.PodAntiAffinity != nil + existingPodNode := existingPodNodeInfo.Node() if hasAffinityConstraints { // For every soft pod affinity term of , if matches the term, diff --git a/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go b/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go index 7622fc5dcc3..d3b39d567eb 100644 --- a/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go +++ b/pkg/scheduler/algorithm/priorities/interpod_affinity_test.go @@ -23,8 +23,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -515,12 +514,12 @@ func TestInterPodAffinityPriority(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) interPodAffinity := InterPodAffinity{ - nodeLister: fakelisters.NodeLister(test.nodes), + nodeInfoLister: snapshot.NodeInfos(), hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, } - list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes) + list, err := interPodAffinity.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -603,12 +602,12 @@ func TestHardPodAffinitySymmetricWeight(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) ipa := InterPodAffinity{ - nodeLister: fakelisters.NodeLister(test.nodes), + nodeInfoLister: snapshot.NodeInfos(), hardPodAffinityWeight: test.hardPodAffinityWeight, } - list, err := ipa.CalculateInterPodAffinityPriority(test.pod, nodeNameToInfo, test.nodes) + list, err := ipa.CalculateInterPodAffinityPriority(test.pod, snapshot.NodeInfoMap, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -660,14 +659,14 @@ func BenchmarkInterPodAffinityPriority(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { existingPods, allNodes := tt.prepFunc(tt.existingPodsNum, tt.allNodesNum) - nodeNameToInfo := schedulernodeinfo.CreateNodeNameToInfoMap(existingPods, allNodes) + snapshot := nodeinfosnapshot.NewSnapshot(existingPods, allNodes) interPodAffinity := InterPodAffinity{ - nodeLister: fakelisters.NodeLister(allNodes), + nodeInfoLister: snapshot.NodeInfos(), hardPodAffinityWeight: v1.DefaultHardPodAffinitySymmetricWeight, } b.ResetTimer() for i := 0; i < b.N; i++ { - interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, nodeNameToInfo, allNodes) + interPodAffinity.CalculateInterPodAffinityPriority(tt.pod, snapshot.NodeInfoMap, allNodes) } }) } diff --git a/pkg/scheduler/algorithm_factory.go b/pkg/scheduler/algorithm_factory.go index be4c275c056..532eacca807 100644 --- a/pkg/scheduler/algorithm_factory.go +++ b/pkg/scheduler/algorithm_factory.go @@ -41,13 +41,13 @@ import ( // PluginFactoryArgs are passed to all plugin factory functions. type PluginFactoryArgs struct { + NodeInfoLister schedulerlisters.NodeInfoLister PodLister schedulerlisters.PodLister ServiceLister corelisters.ServiceLister ControllerLister corelisters.ReplicationControllerLister ReplicaSetLister appslisters.ReplicaSetLister StatefulSetLister appslisters.StatefulSetLister PDBLister policylisters.PodDisruptionBudgetLister - NodeLister schedulerlisters.NodeLister CSINodeLister v1beta1storagelisters.CSINodeLister PVLister corelisters.PersistentVolumeLister PVCLister corelisters.PersistentVolumeClaimLister @@ -270,7 +270,7 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { if policy.Argument.ServiceAffinity != nil { predicateFactory = func(args PluginFactoryArgs) predicates.FitPredicate { predicate, precomputationFunction := predicates.NewServiceAffinityPredicate( - args.NodeLister, + args.NodeInfoLister, args.PodLister, args.ServiceLister, policy.Argument.ServiceAffinity.Labels, diff --git a/pkg/scheduler/algorithmprovider/defaults/register_predicates.go b/pkg/scheduler/algorithmprovider/defaults/register_predicates.go index 92e07ce601d..1f361dd614b 100644 --- a/pkg/scheduler/algorithmprovider/defaults/register_predicates.go +++ b/pkg/scheduler/algorithmprovider/defaults/register_predicates.go @@ -92,7 +92,7 @@ func init() { scheduler.RegisterFitPredicateFactory( predicates.MatchInterPodAffinityPred, func(args scheduler.PluginFactoryArgs) predicates.FitPredicate { - return predicates.NewPodAffinityPredicate(args.NodeLister, args.PodLister) + return predicates.NewPodAffinityPredicate(args.NodeInfoLister, args.PodLister) }, ) diff --git a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go index 259f0b3fead..1a31b403bdb 100644 --- a/pkg/scheduler/algorithmprovider/defaults/register_priorities.go +++ b/pkg/scheduler/algorithmprovider/defaults/register_priorities.go @@ -70,7 +70,7 @@ func init() { priorities.InterPodAffinityPriority, scheduler.PriorityConfigFactory{ Function: func(args scheduler.PluginFactoryArgs) priorities.PriorityFunction { - return priorities.NewInterPodAffinityPriority(args.NodeLister, args.HardPodAffinitySymmetricWeight) + return priorities.NewInterPodAffinityPriority(args.NodeInfoLister, args.HardPodAffinitySymmetricWeight) }, Weight: 1, }, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 7bdeb59a50b..d39e701427e 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1420,7 +1420,7 @@ func TestSelectNodesForPreemption(t *testing.T) { nodes = append(nodes, node) } if test.addAffinityPredicate { - n := fakelisters.NodeLister([]*v1.Node{nodes[0]}) + n := fakelisters.NewNodeInfoLister([]*v1.Node{nodes[0]}) p := fakelisters.PodLister(test.pods) test.predicates[algorithmpredicates.MatchInterPodAffinityPred] = algorithmpredicates.NewPodAffinityPredicate(n, p) } diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index 324ee867dde..b9fe4056f5d 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -57,6 +57,7 @@ import ( internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -175,6 +176,7 @@ type Configurator struct { plugins *config.Plugins pluginConfig []config.PluginConfig pluginConfigProducerRegistry *plugins.ConfigProducerRegistry + nodeInfoSnapshot *nodeinfosnapshot.Snapshot } // ConfigFactoryArgs is a set arguments passed to NewConfigFactory. @@ -259,6 +261,7 @@ func NewConfigFactory(args *ConfigFactoryArgs) *Configurator { plugins: args.Plugins, pluginConfig: args.PluginConfig, pluginConfigProducerRegistry: args.PluginConfigProducerRegistry, + nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(), } return c @@ -407,6 +410,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e pluginConfig, framework.WithClientSet(c.client), framework.WithInformerFactory(c.informerFactory), + framework.WithNodeInfoSnapshot(c.nodeInfoSnapshot), ) if err != nil { klog.Fatalf("error initializing the scheduling framework: %v", err) @@ -582,13 +586,13 @@ func (c *Configurator) getPredicateConfigs(predicateKeys sets.String) (map[strin func (c *Configurator) getAlgorithmArgs() (*PluginFactoryArgs, *plugins.ConfigProducerArgs) { return &PluginFactoryArgs{ - PodLister: c.schedulerCache, + NodeInfoLister: c.nodeInfoSnapshot.NodeInfos(), + PodLister: c.nodeInfoSnapshot.Pods(), ServiceLister: c.serviceLister, ControllerLister: c.controllerLister, ReplicaSetLister: c.replicaSetLister, StatefulSetLister: c.statefulSetLister, PDBLister: c.pdbLister, - NodeLister: c.schedulerCache, CSINodeLister: c.csiNodeLister, PVLister: c.pVLister, PVCLister: c.pVCLister, diff --git a/pkg/scheduler/framework/plugins/BUILD b/pkg/scheduler/framework/plugins/BUILD index c3db046e95d..644ac79c3f3 100644 --- a/pkg/scheduler/framework/plugins/BUILD +++ b/pkg/scheduler/framework/plugins/BUILD @@ -24,7 +24,6 @@ go_library( "//pkg/scheduler/framework/plugins/volumerestrictions:go_default_library", "//pkg/scheduler/framework/plugins/volumezone:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index 3ff09b20c87..cc51737b1bd 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -38,14 +38,12 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) // RegistryArgs arguments needed to create default plugin factories. type RegistryArgs struct { - SchedulerCache internalcache.Cache - VolumeBinder *volumebinder.VolumeBinder + VolumeBinder *volumebinder.VolumeBinder } // NewDefaultRegistry builds a default registry with all the default plugins. @@ -75,9 +73,7 @@ func NewDefaultRegistry(args *RegistryArgs) framework.Registry { nodevolumelimits.GCEPDName: nodevolumelimits.NewGCEPD, nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk, nodevolumelimits.CinderName: nodevolumelimits.NewCinder, - interpodaffinity.Name: func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { - return interpodaffinity.New(args.SchedulerCache, args.SchedulerCache), nil - }, + interpodaffinity.Name: interpodaffinity.New, } } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD index c50607cafbe..5ed715284b0 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/BUILD +++ b/pkg/scheduler/framework/plugins/interpodaffinity/BUILD @@ -9,9 +9,9 @@ go_library( "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", ], ) @@ -23,8 +23,7 @@ go_test( "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", - "//pkg/scheduler/listers/fake:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", + "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go index e02770e1734..e8353aa41f3 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity.go @@ -21,10 +21,10 @@ import ( "fmt" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) @@ -54,8 +54,8 @@ func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.Cy } // New initializes a new plugin and returns it. -func New(nodeLister schedulerlisters.NodeLister, podLister schedulerlisters.PodLister) framework.Plugin { +func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { return &InterPodAffinity{ - predicate: predicates.NewPodAffinityPredicate(nodeLister, podLister), - } + predicate: predicates.NewPodAffinityPredicate(h.SnapshotSharedLister().NodeInfos(), h.SnapshotSharedLister().Pods()), + }, nil } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go index 9887a42e16c..2133ba74e92 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/interpod_affinity_test.go @@ -26,8 +26,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - fakelisters "k8s.io/kubernetes/pkg/scheduler/listers/fake" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) var ( @@ -733,14 +732,15 @@ func TestSingleNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - node := test.node - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, []*v1.Node{node}) - meta := predicates.GetPredicateMetadata(test.pod, nodeInfoMap) + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, []*v1.Node{test.node}) + meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap) state := framework.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) - p := New(fakelisters.NodeLister([]*v1.Node{node}), fakelisters.PodLister(test.pods)) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, nodeInfoMap[node.Name]) + p := &InterPodAffinity{ + predicate: predicates.NewPodAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods()), + } + gotStatus := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[test.node.Name]) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -1432,15 +1432,16 @@ func TestMultipleNodes(t *testing.T) { for indexTest, test := range tests { t.Run(test.name, func(t *testing.T) { - nodeListInfo := fakelisters.NodeLister(test.nodes) - nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(test.pods, test.nodes) + snapshot := nodeinfosnapshot.NewSnapshot(test.pods, test.nodes) for indexNode, node := range test.nodes { - meta := predicates.GetPredicateMetadata(test.pod, nodeInfoMap) + meta := predicates.GetPredicateMetadata(test.pod, snapshot.NodeInfoMap) state := framework.NewCycleState() state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) - p := New(nodeListInfo, fakelisters.PodLister(test.pods)) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, nodeInfoMap[node.Name]) + p := &InterPodAffinity{ + predicate: predicates.NewPodAffinityPredicate(snapshot.NodeInfos(), snapshot.Pods()), + } + gotStatus := p.Filter(context.Background(), state, test.pod, snapshot.NodeInfoMap[node.Name]) if !reflect.DeepEqual(gotStatus, test.wantStatuses[indexNode]) { t.Errorf("index: %d status does not match: %v, want: %v", indexTest, gotStatus, test.wantStatuses[indexNode]) } diff --git a/pkg/scheduler/framework/v1alpha1/BUILD b/pkg/scheduler/framework/v1alpha1/BUILD index 36fad01576d..583ea46a676 100644 --- a/pkg/scheduler/framework/v1alpha1/BUILD +++ b/pkg/scheduler/framework/v1alpha1/BUILD @@ -13,6 +13,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/apis/config:go_default_library", + "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library", diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index d06b0266dd8..faa08d1db90 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/apis/config" + schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/metrics" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" @@ -105,8 +106,9 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint } type frameworkOptions struct { - clientSet clientset.Interface - informerFactory informers.SharedInformerFactory + clientSet clientset.Interface + informerFactory informers.SharedInformerFactory + nodeInfoSnapshot *nodeinfosnapshot.Snapshot } // Option for the framework. @@ -126,7 +128,16 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option } } -var defaultFrameworkOptions = frameworkOptions{} +// WithNodeInfoSnapshot sets the NodeInfo Snapshot. +func WithNodeInfoSnapshot(nodeInfoSnapshot *nodeinfosnapshot.Snapshot) Option { + return func(o *frameworkOptions) { + o.nodeInfoSnapshot = nodeInfoSnapshot + } +} + +var defaultFrameworkOptions = frameworkOptions{ + nodeInfoSnapshot: nodeinfosnapshot.NewEmptySnapshot(), +} var _ Framework = &framework{} @@ -139,7 +150,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi f := &framework{ registry: r, - nodeInfoSnapshot: nodeinfosnapshot.NewSnapshot(), + nodeInfoSnapshot: options.nodeInfoSnapshot, pluginNameToWeightMap: make(map[string]int), waitingPods: newWaitingPodsMap(), clientSet: options.clientSet, @@ -593,6 +604,14 @@ func (f *framework) RunPermitPlugins( return nil } +// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo +// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains +// unchanged until a pod finishes "Reserve". There is no guarantee that the information +// remains unchanged after "Reserve". +func (f *framework) SnapshotSharedLister() schedulerlisters.SharedLister { + return f.nodeInfoSnapshot +} + // NodeInfoSnapshot returns the latest NodeInfo snapshot. The snapshot // is taken at the beginning of a scheduling cycle and remains unchanged until a // pod finishes "Reserve". There is no guarantee that the information remains diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index b33e85008da..2439e0764e7 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -29,6 +29,7 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/scheduler/apis/config" + schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) @@ -452,6 +453,15 @@ type Framework interface { // passed to the plugin factories at the time of plugin initialization. Plugins // must store and use this handle to call framework functions. type FrameworkHandle interface { + // SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot + // is taken at the beginning of a scheduling cycle and remains unchanged until + // a pod finishes "Reserve" point. There is no guarantee that the information + // remains unchanged in the binding phase of scheduling, so plugins in the binding + // cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it, + // otherwise a concurrent read/write error might occur, they should use scheduler + // cache instead. + SnapshotSharedLister() schedulerlisters.SharedLister + // NodeInfoSnapshot return the latest NodeInfo snapshot. The snapshot // is taken at the beginning of a scheduling cycle and remains unchanged until // a pod finishes "Reserve" point. There is no guarantee that the information diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index c3fa5b4e1b9..77e1099bb5a 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1077,7 +1077,7 @@ func TestNodeOperators(t *testing.T) { } // Case 2: dump cached nodes successfully. - cachedNodes := nodeinfosnapshot.NewSnapshot() + cachedNodes := nodeinfosnapshot.NewEmptySnapshot() cache.UpdateNodeInfoSnapshot(cachedNodes) newNode, found := cachedNodes.NodeInfoMap[node.Name] if !found || len(cachedNodes.NodeInfoMap) != 1 { @@ -1333,7 +1333,7 @@ func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { cache = newSchedulerCache(time.Second, time.Second, nil) - snapshot = nodeinfosnapshot.NewSnapshot() + snapshot = nodeinfosnapshot.NewEmptySnapshot() for _, op := range test.operations { op() @@ -1382,7 +1382,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() for n := 0; n < b.N; n++ { - cachedNodes := nodeinfosnapshot.NewSnapshot() + cachedNodes := nodeinfosnapshot.NewEmptySnapshot() cache.UpdateNodeInfoSnapshot(cachedNodes) } } diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index 2ef7ac3c180..a4c47cc0cc3 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -102,9 +102,6 @@ type Cache interface { // on this node. UpdateNodeInfoSnapshot(nodeSnapshot *nodeinfosnapshot.Snapshot) error - // GetNodeInfo returns the node object with node string. - GetNodeInfo(nodeName string) (*v1.Node, error) - // Snapshot takes a snapshot on current cache Snapshot() *Snapshot } diff --git a/pkg/scheduler/internal/queue/BUILD b/pkg/scheduler/internal/queue/BUILD index d44cb081180..28f60951dd3 100644 --- a/pkg/scheduler/internal/queue/BUILD +++ b/pkg/scheduler/internal/queue/BUILD @@ -37,6 +37,7 @@ go_test( "//pkg/api/v1/pod:go_default_library", "//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", + "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library", diff --git a/pkg/scheduler/internal/queue/scheduling_queue_test.go b/pkg/scheduler/internal/queue/scheduling_queue_test.go index 4d1537c054e..e5fefbadad2 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue_test.go +++ b/pkg/scheduler/internal/queue/scheduling_queue_test.go @@ -35,6 +35,7 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/scheduler/apis/config" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" "k8s.io/kubernetes/pkg/scheduler/metrics" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" @@ -246,6 +247,10 @@ func (*fakeFramework) SharedInformerFactory() informers.SharedInformerFactory { return nil } +func (*fakeFramework) SnapshotSharedLister() schedulerlisters.SharedLister { + return nil +} + func TestPriorityQueue_AddWithReversePriorityLessFunc(t *testing.T) { q := NewPriorityQueue(nil, &fakeFramework{}) if err := q.Add(&medPriorityPod); err != nil { diff --git a/pkg/scheduler/listers/BUILD b/pkg/scheduler/listers/BUILD index 847ce1868de..3e8d3f4ca68 100644 --- a/pkg/scheduler/listers/BUILD +++ b/pkg/scheduler/listers/BUILD @@ -6,6 +6,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/listers", visibility = ["//visibility:public"], deps = [ + "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", ], diff --git a/pkg/scheduler/listers/fake/BUILD b/pkg/scheduler/listers/fake/BUILD index 8125ec531c7..57efb259970 100644 --- a/pkg/scheduler/listers/fake/BUILD +++ b/pkg/scheduler/listers/fake/BUILD @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/scheduler/listers:go_default_library", + "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", diff --git a/pkg/scheduler/listers/fake/listers.go b/pkg/scheduler/listers/fake/listers.go index 36f62498efd..b09717432b0 100644 --- a/pkg/scheduler/listers/fake/listers.go +++ b/pkg/scheduler/listers/fake/listers.go @@ -30,6 +30,7 @@ import ( storagelisters "k8s.io/client-go/listers/storage/v1" v1beta1storagelisters "k8s.io/client-go/listers/storage/v1beta1" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) var _ schedulerlisters.PodLister = &PodLister{} @@ -115,7 +116,7 @@ func (f ControllerLister) GetPodControllers(pod *v1.Pod) (controllers []*v1.Repl } } if len(controllers) == 0 { - err = fmt.Errorf("Could not find Replication Controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find Replication Controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return @@ -154,7 +155,7 @@ func (f ReplicaSetLister) GetPodReplicaSets(pod *v1.Pod) (rss []*appsv1.ReplicaS } } if len(rss) == 0 { - err = fmt.Errorf("Could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return @@ -192,7 +193,7 @@ func (f StatefulSetLister) GetPodStatefulSets(pod *v1.Pod) (sss []*appsv1.Statef } } if len(sss) == 0 { - err = fmt.Errorf("Could not find StatefulSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + err = fmt.Errorf("could not find StatefulSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) } return } @@ -243,17 +244,34 @@ func (pvcs PersistentVolumeClaimLister) PersistentVolumeClaims(namespace string) } } -// NodeLister declares a *v1.Node type for testing. -type NodeLister []*v1.Node +// NodeInfoLister declares a schedulernodeinfo.NodeInfo type for testing. +type NodeInfoLister []*schedulernodeinfo.NodeInfo -// GetNodeInfo returns a fake node object in the fake nodes. -func (nodes NodeLister) GetNodeInfo(nodeName string) (*v1.Node, error) { +// Get returns a fake node object in the fake nodes. +func (nodes NodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) { for _, node := range nodes { - if node != nil && node.Name == nodeName { + if node != nil && node.Node().Name == nodeName { return node, nil } } - return nil, fmt.Errorf("Unable to find node: %s", nodeName) + return nil, fmt.Errorf("unable to find node: %s", nodeName) +} + +// List lists all nodes. +func (nodes NodeInfoLister) List() ([]*schedulernodeinfo.NodeInfo, error) { + return nodes, nil +} + +// NewNodeInfoLister create a new fake NodeInfoLister from a slice of v1.Nodes. +func NewNodeInfoLister(nodes []*v1.Node) schedulerlisters.NodeInfoLister { + nodeInfoList := make([]*schedulernodeinfo.NodeInfo, len(nodes)) + for _, node := range nodes { + nodeInfo := schedulernodeinfo.NewNodeInfo() + nodeInfo.SetNode(node) + nodeInfoList = append(nodeInfoList, nodeInfo) + } + + return NodeInfoLister(nodeInfoList) } var _ v1beta1storagelisters.CSINodeLister = CSINodeLister{} diff --git a/pkg/scheduler/listers/listers.go b/pkg/scheduler/listers/listers.go index 322e203ffe3..a760402a109 100644 --- a/pkg/scheduler/listers/listers.go +++ b/pkg/scheduler/listers/listers.go @@ -19,6 +19,7 @@ package listers import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) // PodFilter is a function to filter a pod. If pod passed return true else return false. @@ -33,8 +34,16 @@ type PodLister interface { FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) } -// NodeLister interface represents anything that can list/get node object from node name. -type NodeLister interface { - // TODO(ahg-g): rename to Get and add a List interface. - GetNodeInfo(nodeName string) (*v1.Node, error) +// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name. +type NodeInfoLister interface { + // Returns the list of NodeInfos. + List() ([]*schedulernodeinfo.NodeInfo, error) + // Returns the NodeInfo of the given node name. + Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) +} + +// SharedLister groups scheduler-specific listers. +type SharedLister interface { + Pods() PodLister + NodeInfos() NodeInfoLister } diff --git a/pkg/scheduler/nodeinfo/snapshot/BUILD b/pkg/scheduler/nodeinfo/snapshot/BUILD index 6001bee2af3..870614f9397 100644 --- a/pkg/scheduler/nodeinfo/snapshot/BUILD +++ b/pkg/scheduler/nodeinfo/snapshot/BUILD @@ -6,8 +6,10 @@ go_library( importpath = "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot", visibility = ["//visibility:public"], deps = [ + "//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", ], ) diff --git a/pkg/scheduler/nodeinfo/snapshot/snapshot.go b/pkg/scheduler/nodeinfo/snapshot/snapshot.go index 1f9e39e998a..4b027383fc9 100644 --- a/pkg/scheduler/nodeinfo/snapshot/snapshot.go +++ b/pkg/scheduler/nodeinfo/snapshot/snapshot.go @@ -17,7 +17,11 @@ limitations under the License. package nodeinfo import ( + "fmt" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) @@ -31,13 +35,40 @@ type Snapshot struct { Generation int64 } -// NewSnapshot initializes a Snapshot struct and returns it. -func NewSnapshot() *Snapshot { +var _ schedulerlisters.SharedLister = &Snapshot{} + +// NewEmptySnapshot initializes a Snapshot struct and returns it. +func NewEmptySnapshot() *Snapshot { return &Snapshot{ NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo), } } +// NewSnapshot initializes a Snapshot struct and returns it. +func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot { + nodeInfoMap := schedulernodeinfo.CreateNodeNameToInfoMap(pods, nodes) + nodeInfoList := make([]*schedulernodeinfo.NodeInfo, 0, len(nodes)) + for _, v := range nodeInfoMap { + nodeInfoList = append(nodeInfoList, v) + } + + s := NewEmptySnapshot() + s.NodeInfoMap = nodeInfoMap + s.NodeInfoList = nodeInfoList + + return s +} + +// Pods returns a PodLister +func (s *Snapshot) Pods() schedulerlisters.PodLister { + return &podLister{snapshot: s} +} + +// NodeInfos returns a NodeInfoLister. +func (s *Snapshot) NodeInfos() schedulerlisters.NodeInfoLister { + return &nodeInfoLister{snapshot: s} +} + // ListNodes returns the list of nodes in the snapshot. func (s *Snapshot) ListNodes() []*v1.Node { nodes := make([]*v1.Node, 0, len(s.NodeInfoMap)) @@ -48,3 +79,50 @@ func (s *Snapshot) ListNodes() []*v1.Node { } return nodes } + +type podLister struct { + snapshot *Snapshot +} + +// List returns the list of pods in the snapshot. +func (p *podLister) List(selector labels.Selector) ([]*v1.Pod, error) { + alwaysTrue := func(p *v1.Pod) bool { return true } + return p.FilteredList(alwaysTrue, selector) +} + +// FilteredList returns a filtered list of pods in the snapshot. +func (p *podLister) FilteredList(podFilter schedulerlisters.PodFilter, selector labels.Selector) ([]*v1.Pod, error) { + // podFilter is expected to return true for most or all of the pods. We + // can avoid expensive array growth without wasting too much memory by + // pre-allocating capacity. + maxSize := 0 + for _, n := range p.snapshot.NodeInfoMap { + maxSize += len(n.Pods()) + } + pods := make([]*v1.Pod, 0, maxSize) + for _, n := range p.snapshot.NodeInfoMap { + for _, pod := range n.Pods() { + if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, pod) + } + } + } + return pods, nil +} + +type nodeInfoLister struct { + snapshot *Snapshot +} + +// List returns the list of nodes in the snapshot. +func (n *nodeInfoLister) List() ([]*schedulernodeinfo.NodeInfo, error) { + return n.snapshot.NodeInfoList, nil +} + +// Returns the NodeInfo of the given node name. +func (n *nodeInfoLister) Get(nodeName string) (*schedulernodeinfo.NodeInfo, error) { + if v, ok := n.snapshot.NodeInfoMap[nodeName]; ok { + return v, nil + } + return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d6c754bf10f..73f02c90318 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -277,8 +277,7 @@ func New(client clientset.Interface, registry := options.frameworkDefaultRegistry if registry == nil { registry = frameworkplugins.NewDefaultRegistry(&frameworkplugins.RegistryArgs{ - SchedulerCache: schedulerCache, - VolumeBinder: volumeBinder, + VolumeBinder: volumeBinder, }) } registry.Merge(options.frameworkOutOfTreeRegistry)