From 02a0aad6d3d86ddc9a6c6e1e25ac9fe197342cd7 Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Thu, 12 Dec 2019 22:20:02 -0500 Subject: [PATCH] podfitsresource metadata as prefilter --- pkg/scheduler/BUILD | 1 + .../algorithm/predicates/metadata.go | 47 +--------- .../algorithm/predicates/metadata_test.go | 7 -- .../algorithm/predicates/predicates.go | 76 +++++----------- .../algorithm/predicates/predicates_test.go | 12 +-- pkg/scheduler/factory.go | 9 +- .../framework/plugins/default_registry.go | 8 +- .../framework/plugins/noderesources/BUILD | 5 +- .../framework/plugins/noderesources/fit.go | 90 ++++++++++++++++--- .../plugins/noderesources/fit_test.go | 83 ++++++++++------- pkg/scheduler/framework/v1alpha1/registry.go | 2 +- 11 files changed, 171 insertions(+), 169 deletions(-) diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index cc092096171..90112ba941f 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -22,6 +22,7 @@ go_library( "//pkg/scheduler/core:go_default_library", "//pkg/scheduler/framework/plugins:go_default_library", "//pkg/scheduler/framework/plugins/nodelabel:go_default_library", + "//pkg/scheduler/framework/plugins/noderesources:go_default_library", "//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library", "//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index 017d12bc39a..deb7f1b97b4 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -281,36 +281,12 @@ func (m *PodAffinityMetadata) Clone() *PodAffinityMetadata { return © } -type podFitsResourcesMetadata struct { - // ignoredExtendedResources is a set of extended resource names that will - // be ignored in the PodFitsResources predicate. - // - // They can be scheduler extender managed resources, the consumption of - // which should be accounted only by the extenders. This set is synthesized - // from scheduler extender configuration and does not change per pod. - ignoredExtendedResources sets.String - podRequest *schedulernodeinfo.Resource -} - -func (m *podFitsResourcesMetadata) clone() *podFitsResourcesMetadata { - if m == nil { - return nil - } - - copy := podFitsResourcesMetadata{} - copy.ignoredExtendedResources = m.ignoredExtendedResources - copy.podRequest = m.podRequest - - return © -} - // NOTE: When new fields are added/removed or logic is changed, please make sure that // RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes. type predicateMetadata struct { pod *v1.Pod - serviceAffinityMetadata *serviceAffinityMetadata - podFitsResourcesMetadata *podFitsResourcesMetadata + serviceAffinityMetadata *serviceAffinityMetadata } // Ensure that predicateMetadata implements algorithm.Metadata. @@ -332,17 +308,6 @@ func EmptyMetadataProducer(pod *v1.Pod, sharedLister schedulerlisters.SharedList return nil } -// RegisterPredicateMetadataProducerWithExtendedResourceOptions registers a -// MetadataProducer that creates predicate metadata with the provided -// options for extended resources. -// -// See the comments in "predicateMetadata" for the explanation of the options. -func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) { - RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) { - pm.podFitsResourcesMetadata.ignoredExtendedResources = ignoredExtendedResources - }) -} - // MetadataProducerFactory is a factory to produce Metadata. type MetadataProducerFactory struct{} @@ -354,8 +319,7 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister } predicateMetadata := &predicateMetadata{ - pod: pod, - podFitsResourcesMetadata: getPodFitsResourcesMetedata(pod), + pod: pod, } for predicateName, precomputeFunc := range predicateMetadataProducers { klog.V(10).Infof("Precompute: %v", predicateName) @@ -364,12 +328,6 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister return predicateMetadata } -func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata { - return &podFitsResourcesMetadata{ - podRequest: GetResourceRequest(pod), - } -} - // GetPodAffinityMetadata computes inter-pod affinity metadata. func GetPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo, havePodsWithAffinityNodes []*schedulernodeinfo.NodeInfo) (*PodAffinityMetadata, error) { // existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity @@ -593,7 +551,6 @@ func (meta *predicateMetadata) ShallowCopy() Metadata { pod: meta.pod, } newPredMeta.serviceAffinityMetadata = meta.serviceAffinityMetadata.clone() - newPredMeta.podFitsResourcesMetadata = meta.podFitsResourcesMetadata.clone() return (Metadata)(newPredMeta) } diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index 8a844f6be65..adf73050740 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -228,13 +228,6 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) { Namespace: "testns", }, }, - podFitsResourcesMetadata: &podFitsResourcesMetadata{ - podRequest: &schedulernodeinfo.Resource{ - MilliCPU: 1000, - Memory: 300, - AllowedPodNumber: 4, - }, - }, serviceAffinityMetadata: &serviceAffinityMetadata{ matchingPodList: []*v1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 5cba496b279..ecd14da960e 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -780,10 +780,16 @@ func podName(pod *v1.Pod) string { return pod.Namespace + "/" + pod.Name } -// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod. +// PodFitsResources is a wrapper around PodFitsResourcesPredicate that implements FitPredicate interface. +// TODO(#85822): remove this function once predicate registration logic is deleted. +func PodFitsResources(pod *v1.Pod, _ Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { + return PodFitsResourcesPredicate(pod, nil, nil, nodeInfo) +} + +// PodFitsResourcesPredicate checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod. // First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the -// predicate failure reasons if the node has insufficient resources to run the pod. -func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { +// predicate failure reasons if the node has insufficient resources to run the pod +func PodFitsResourcesPredicate(pod *v1.Pod, podRequest *schedulernodeinfo.Resource, ignoredExtendedResources sets.String, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { return false, nil, fmt.Errorf("node not found") @@ -795,17 +801,11 @@ func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.No predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber))) } - // No extended resources should be ignored by default. - ignoredExtendedResources := sets.NewString() + if ignoredExtendedResources == nil { + ignoredExtendedResources = sets.NewString() + } - var podRequest *schedulernodeinfo.Resource - if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil { - podRequest = predicateMeta.podFitsResourcesMetadata.podRequest - if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil { - ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources - } - } else { - // We couldn't parse metadata - fallback to computing it. + if podRequest == nil { podRequest = GetResourceRequest(pod) } if podRequest.MilliCPU == 0 && @@ -839,13 +839,11 @@ func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.No } } - if klog.V(10) { - if len(predicateFails) == 0 { - // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is - // not logged. There is visible performance gain from it. - klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", - podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber) - } + if klog.V(10) && len(predicateFails) == 0 { + // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", + podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber) } return len(predicateFails) == 0, predicateFails, nil } @@ -1144,43 +1142,11 @@ func haveOverlap(a1, a2 []string) bool { return false } -// GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates -// that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need. +// GeneralPredicates checks a group of predicates that the kubelet cares about. +// DEPRECATED: this exist only because kubelet uses it. We should change kubelet to execute the individual predicates it requires. func GeneralPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { var predicateFails []PredicateFailureReason - for _, predicate := range []FitPredicate{noncriticalPredicates, EssentialPredicates} { - fit, reasons, err := predicate(pod, meta, nodeInfo) - if err != nil { - return false, predicateFails, err - } - if !fit { - predicateFails = append(predicateFails, reasons...) - } - } - - return len(predicateFails) == 0, predicateFails, nil -} - -// noncriticalPredicates are the predicates that only non-critical pods need. -func noncriticalPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { - var predicateFails []PredicateFailureReason - fit, reasons, err := PodFitsResources(pod, meta, nodeInfo) - if err != nil { - return false, predicateFails, err - } - if !fit { - predicateFails = append(predicateFails, reasons...) - } - - return len(predicateFails) == 0, predicateFails, nil -} - -// EssentialPredicates are the predicates that all pods, including critical pods, need. -func EssentialPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) { - var predicateFails []PredicateFailureReason - // TODO: PodFitsHostPorts is essential for now, but kubelet should ideally - // preempt pods to free up host ports too - for _, predicate := range []FitPredicate{PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector} { + for _, predicate := range []FitPredicate{PodFitsResources, PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector} { fit, reasons, err := predicate(pod, meta, nodeInfo) if err != nil { return false, predicateFails, err diff --git a/pkg/scheduler/algorithm/predicates/predicates_test.go b/pkg/scheduler/algorithm/predicates/predicates_test.go index 8cf32b4e471..abcf5294560 100644 --- a/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -388,10 +388,7 @@ func TestPodFitsResources(t *testing.T) { t.Run(test.name, func(t *testing.T) { node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}} test.nodeInfo.SetNode(&node) - RegisterPredicateMetadataProducerWithExtendedResourceOptions(test.ignoredExtendedResources) - factory := &MetadataProducerFactory{} - meta := factory.GetPredicateMetadata(test.pod, nil) - fits, reasons, err := PodFitsResources(test.pod, meta, test.nodeInfo) + fits, reasons, err := PodFitsResourcesPredicate(test.pod, GetResourceRequest(test.pod), test.ignoredExtendedResources, test.nodeInfo) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -448,8 +445,8 @@ func TestPodFitsResources(t *testing.T) { t.Run(test.name, func(t *testing.T) { node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1, 0, 0, 0)}} test.nodeInfo.SetNode(&node) - factory := &MetadataProducerFactory{} - fits, reasons, err := PodFitsResources(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo) + fits, reasons, err := PodFitsResourcesPredicate(test.pod, GetResourceRequest(test.pod), nil, test.nodeInfo) + if err != nil { t.Errorf("unexpected error: %v", err) } @@ -509,8 +506,7 @@ func TestPodFitsResources(t *testing.T) { t.Run(test.name, func(t *testing.T) { node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}} test.nodeInfo.SetNode(&node) - factory := &MetadataProducerFactory{} - fits, reasons, err := PodFitsResources(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo) + fits, reasons, err := PodFitsResourcesPredicate(test.pod, GetResourceRequest(test.pod), nil, test.nodeInfo) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/factory.go b/pkg/scheduler/factory.go index b23dd82a29c..996c548ccba 100644 --- a/pkg/scheduler/factory.go +++ b/pkg/scheduler/factory.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" @@ -177,8 +178,8 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, var extenders []algorithm.SchedulerExtender if len(policy.Extenders) != 0 { - ignoredExtendedResources := sets.NewString() var ignorableExtenders []algorithm.SchedulerExtender + var ignoredExtendedResources []string for ii := range policy.Extenders { klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii]) extender, err := core.NewHTTPExtender(&policy.Extenders[ii]) @@ -192,13 +193,15 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler, } for _, r := range policy.Extenders[ii].ManagedResources { if r.IgnoredByScheduler { - ignoredExtendedResources.Insert(string(r.Name)) + ignoredExtendedResources = append(ignoredExtendedResources, r.Name) } } } + c.configProducerArgs.NodeResourcesFitArgs = &noderesources.FitArgs{ + IgnoredResources: ignoredExtendedResources, + } // place ignorable extenders to the tail of extenders extenders = append(extenders, ignorableExtenders...) - predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources) } // Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value. // Give it higher precedence than scheduler CLI configuration when it is provided. diff --git a/pkg/scheduler/framework/plugins/default_registry.go b/pkg/scheduler/framework/plugins/default_registry.go index 69bc5daa52d..05eafbd8e1f 100644 --- a/pkg/scheduler/framework/plugins/default_registry.go +++ b/pkg/scheduler/framework/plugins/default_registry.go @@ -99,6 +99,8 @@ type ConfigProducerArgs struct { RequestedToCapacityRatioArgs *requestedtocapacityratio.Args // ServiceAffinityArgs is the args for the ServiceAffinity plugin. ServiceAffinityArgs *serviceaffinity.Args + // NodeResourcesFitArgs is the args for the NodeResources fit filter. + NodeResourcesFitArgs *noderesources.FitArgs } // ConfigProducer produces a framework's configuration. @@ -119,9 +121,10 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { } // Register Predicates. registry.RegisterPredicate(predicates.GeneralPred, - func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { // GeneralPredicate is a combination of predicates. plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil) + pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs)) plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil) plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil) plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil) @@ -133,8 +136,9 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry { return }) registry.RegisterPredicate(predicates.PodFitsResourcesPred, - func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil) + pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs)) return }) registry.RegisterPredicate(predicates.HostNamePred, diff --git a/pkg/scheduler/framework/plugins/noderesources/BUILD b/pkg/scheduler/framework/plugins/noderesources/BUILD index aa2751836fb..aecbcb1dc8b 100644 --- a/pkg/scheduler/framework/plugins/noderesources/BUILD +++ b/pkg/scheduler/framework/plugins/noderesources/BUILD @@ -21,6 +21,8 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) @@ -51,14 +53,13 @@ go_test( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/features:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", - "//pkg/scheduler/framework/plugins/migration:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo/snapshot:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", ], diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 87a5724e691..044f064a836 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -20,38 +20,104 @@ import ( "context" "fmt" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog" + "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) -// Fit is a plugin that checks if a node has sufficient resources. -type Fit struct{} - +var _ framework.PreFilterPlugin = &Fit{} var _ framework.FilterPlugin = &Fit{} -// FitName is the name of the plugin used in the plugin registry and configurations. -const FitName = "NodeResourcesFit" +const ( + // FitName is the name of the plugin used in the plugin registry and configurations. + FitName = "NodeResourcesFit" + + // preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data. + // Using the name of the plugin will likely help us avoid collisions with other plugins. + preFilterStateKey = "PreFilter" + FitName +) + +// Fit is a plugin that checks if a node has sufficient resources. +type Fit struct { + ignoredResources sets.String +} + +// FitArgs holds the args that are used to configure the plugin. +type FitArgs struct { + // IgnoredResources is the list of resources that NodeResources fit filter + // should ignore. + IgnoredResources []string `json:"IgnoredResources,omitempty"` +} + +// preFilterState computed at PreFilter and used at Filter. +type preFilterState struct { + podResourceRequest *nodeinfo.Resource +} + +// Clone the prefilter state. +func (s *preFilterState) Clone() framework.StateData { + return s +} // Name returns name of the plugin. It is used in logs, etc. func (f *Fit) Name() string { return FitName } +// PreFilter invoked at the prefilter extension point. +func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { + s := &preFilterState{ + podResourceRequest: predicates.GetResourceRequest(pod), + } + cycleState.Write(preFilterStateKey, s) + return nil +} + +// PreFilterExtensions returns prefilter extensions, pod add and remove. +func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +func getPodResourceRequest(cycleState *framework.CycleState) (*nodeinfo.Resource, error) { + c, err := cycleState.Read(preFilterStateKey) + if err != nil { + // The metadata wasn't pre-computed in prefilter. We ignore the error for now since + // Filter is able to handle that by computing it again. + klog.Errorf("reading %q from cycleState: %v", preFilterStateKey, err) + return nil, nil + } + + s, ok := c.(*preFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c) + } + return s.podResourceRequest, nil +} + // Filter invoked at the filter extension point. func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status { - meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState)) - if !ok { - return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState)) + r, err := getPodResourceRequest(cycleState) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) } - _, reasons, err := predicates.PodFitsResources(pod, meta, nodeInfo) + _, reasons, err := predicates.PodFitsResourcesPredicate(pod, r, f.ignoredResources, nodeInfo) return migration.PredicateResultToFrameworkStatus(reasons, err) } // NewFit initializes a new plugin and returns it. -func NewFit(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { - return &Fit{}, nil +func NewFit(plArgs *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { + args := &FitArgs{} + if err := framework.DecodeInto(plArgs, args); err != nil { + return nil, err + } + + fit := &Fit{} + fit.ignoredResources = sets.NewString(args.IgnoredResources...) + return fit, nil } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index e7e8d96908f..cafb6db06f6 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -18,18 +18,17 @@ package noderesources import ( "context" + "k8s.io/apimachinery/pkg/runtime" "reflect" "testing" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) @@ -93,11 +92,12 @@ func TestNodeResourcesFit(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodOverhead, true)() enoughPodsTests := []struct { - pod *v1.Pod - nodeInfo *schedulernodeinfo.NodeInfo - name string - ignoredExtendedResources sets.String - wantStatus *framework.Status + pod *v1.Pod + nodeInfo *schedulernodeinfo.NodeInfo + name string + ignoredResources []byte + preFilterDisabled bool + wantStatus *framework.Status }{ { pod: &v1.Pod{}, @@ -116,6 +116,18 @@ func TestNodeResourcesFit(t *testing.T) { predicates.NewInsufficientResourceError(v1.ResourceMemory, 2, 10, 10).GetReason(), ), }, + { + pod: newResourcePod(schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1}), + nodeInfo: schedulernodeinfo.NewNodeInfo( + newResourcePod(schedulernodeinfo.Resource{MilliCPU: 10, Memory: 20})), + name: "without prefilter", + preFilterDisabled: true, + wantStatus: framework.NewStatus( + framework.Unschedulable, + predicates.NewInsufficientResourceError(v1.ResourceCPU, 2, 10, 10).GetReason(), + predicates.NewInsufficientResourceError(v1.ResourceMemory, 2, 10, 10).GetReason(), + ), + }, { pod: newResourceInitPod(newResourcePod(schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1}), schedulernodeinfo.Resource{MilliCPU: 3, Memory: 1}), nodeInfo: schedulernodeinfo.NewNodeInfo( @@ -318,9 +330,8 @@ func TestNodeResourcesFit(t *testing.T) { schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1, ScalarResources: map[v1.ResourceName]int64{extendedResourceB: 1}}), nodeInfo: schedulernodeinfo.NewNodeInfo( newResourcePod(schedulernodeinfo.Resource{MilliCPU: 0, Memory: 0})), - ignoredExtendedResources: sets.NewString(string(extendedResourceB)), - name: "skip checking ignored extended resource", - wantStatus: framework.NewStatus(framework.Unschedulable, predicates.NewInsufficientResourceError(extendedResourceB, 2, 10, 10).GetReason()), + ignoredResources: []byte(`{"IgnoredResources" : ["example.com/bbb"]}`), + name: "skip checking ignored extended resource", }, { pod: newResourceOverheadPod( @@ -329,8 +340,7 @@ func TestNodeResourcesFit(t *testing.T) { ), nodeInfo: schedulernodeinfo.NewNodeInfo( newResourcePod(schedulernodeinfo.Resource{MilliCPU: 5, Memory: 5})), - ignoredExtendedResources: sets.NewString(string(extendedResourceB)), - name: "resources + pod overhead fits", + name: "resources + pod overhead fits", }, { pod: newResourceOverheadPod( @@ -339,24 +349,27 @@ func TestNodeResourcesFit(t *testing.T) { ), nodeInfo: schedulernodeinfo.NewNodeInfo( newResourcePod(schedulernodeinfo.Resource{MilliCPU: 5, Memory: 5})), - ignoredExtendedResources: sets.NewString(string(extendedResourceB)), - name: "requests + overhead does not fit for memory", - wantStatus: framework.NewStatus(framework.Unschedulable, predicates.NewInsufficientResourceError(v1.ResourceMemory, 16, 5, 20).GetReason()), + name: "requests + overhead does not fit for memory", + wantStatus: framework.NewStatus(framework.Unschedulable, predicates.NewInsufficientResourceError(v1.ResourceMemory, 16, 5, 20).GetReason()), }, } for _, test := range enoughPodsTests { t.Run(test.name, func(t *testing.T) { - factory := &predicates.MetadataProducerFactory{} - meta := factory.GetPredicateMetadata(test.pod, nil) - state := framework.NewCycleState() - state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) - node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}} test.nodeInfo.SetNode(&node) - p, _ := NewFit(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) + args := &runtime.Unknown{Raw: test.ignoredResources} + p, _ := NewFit(args, nil) + cycleState := framework.NewCycleState() + if !test.preFilterDisabled { + preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) + if !preFilterStatus.IsSuccess() { + t.Errorf("prefilter failed with status: %v", preFilterStatus) + } + } + + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -401,16 +414,17 @@ func TestNodeResourcesFit(t *testing.T) { } for _, test := range notEnoughPodsTests { t.Run(test.name, func(t *testing.T) { - factory := &predicates.MetadataProducerFactory{} - meta := factory.GetPredicateMetadata(test.pod, nil) - state := framework.NewCycleState() - state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) - node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1, 0, 0, 0)}} test.nodeInfo.SetNode(&node) p, _ := NewFit(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) + if !preFilterStatus.IsSuccess() { + t.Errorf("prefilter failed with status: %v", preFilterStatus) + } + + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } @@ -453,16 +467,17 @@ func TestNodeResourcesFit(t *testing.T) { for _, test := range storagePodsTests { t.Run(test.name, func(t *testing.T) { - factory := &predicates.MetadataProducerFactory{} - meta := factory.GetPredicateMetadata(test.pod, nil) - state := framework.NewCycleState() - state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta}) - node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}} test.nodeInfo.SetNode(&node) p, _ := NewFit(nil, nil) - gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo) + cycleState := framework.NewCycleState() + preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod) + if !preFilterStatus.IsSuccess() { + t.Errorf("prefilter failed with status: %v", preFilterStatus) + } + + gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) } diff --git a/pkg/scheduler/framework/v1alpha1/registry.go b/pkg/scheduler/framework/v1alpha1/registry.go index f2398da8a0a..39432f1e6fe 100644 --- a/pkg/scheduler/framework/v1alpha1/registry.go +++ b/pkg/scheduler/framework/v1alpha1/registry.go @@ -29,7 +29,7 @@ type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Pl // DecodeInto decodes configuration whose type is *runtime.Unknown to the interface into. func DecodeInto(configuration *runtime.Unknown, into interface{}) error { - if configuration == nil { + if configuration == nil || configuration.Raw == nil { return nil }