mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
podfitsresource metadata as prefilter
This commit is contained in:
parent
b92d20033e
commit
02a0aad6d3
@ -22,6 +22,7 @@ go_library(
|
||||
"//pkg/scheduler/core:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/nodelabel:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/requestedtocapacityratio:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/serviceaffinity:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
|
@ -281,36 +281,12 @@ func (m *PodAffinityMetadata) Clone() *PodAffinityMetadata {
|
||||
return ©
|
||||
}
|
||||
|
||||
type podFitsResourcesMetadata struct {
|
||||
// ignoredExtendedResources is a set of extended resource names that will
|
||||
// be ignored in the PodFitsResources predicate.
|
||||
//
|
||||
// They can be scheduler extender managed resources, the consumption of
|
||||
// which should be accounted only by the extenders. This set is synthesized
|
||||
// from scheduler extender configuration and does not change per pod.
|
||||
ignoredExtendedResources sets.String
|
||||
podRequest *schedulernodeinfo.Resource
|
||||
}
|
||||
|
||||
func (m *podFitsResourcesMetadata) clone() *podFitsResourcesMetadata {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
copy := podFitsResourcesMetadata{}
|
||||
copy.ignoredExtendedResources = m.ignoredExtendedResources
|
||||
copy.podRequest = m.podRequest
|
||||
|
||||
return ©
|
||||
}
|
||||
|
||||
// NOTE: When new fields are added/removed or logic is changed, please make sure that
|
||||
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
|
||||
type predicateMetadata struct {
|
||||
pod *v1.Pod
|
||||
|
||||
serviceAffinityMetadata *serviceAffinityMetadata
|
||||
podFitsResourcesMetadata *podFitsResourcesMetadata
|
||||
serviceAffinityMetadata *serviceAffinityMetadata
|
||||
}
|
||||
|
||||
// Ensure that predicateMetadata implements algorithm.Metadata.
|
||||
@ -332,17 +308,6 @@ func EmptyMetadataProducer(pod *v1.Pod, sharedLister schedulerlisters.SharedList
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterPredicateMetadataProducerWithExtendedResourceOptions registers a
|
||||
// MetadataProducer that creates predicate metadata with the provided
|
||||
// options for extended resources.
|
||||
//
|
||||
// See the comments in "predicateMetadata" for the explanation of the options.
|
||||
func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) {
|
||||
RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) {
|
||||
pm.podFitsResourcesMetadata.ignoredExtendedResources = ignoredExtendedResources
|
||||
})
|
||||
}
|
||||
|
||||
// MetadataProducerFactory is a factory to produce Metadata.
|
||||
type MetadataProducerFactory struct{}
|
||||
|
||||
@ -354,8 +319,7 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister
|
||||
}
|
||||
|
||||
predicateMetadata := &predicateMetadata{
|
||||
pod: pod,
|
||||
podFitsResourcesMetadata: getPodFitsResourcesMetedata(pod),
|
||||
pod: pod,
|
||||
}
|
||||
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
||||
klog.V(10).Infof("Precompute: %v", predicateName)
|
||||
@ -364,12 +328,6 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister
|
||||
return predicateMetadata
|
||||
}
|
||||
|
||||
func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata {
|
||||
return &podFitsResourcesMetadata{
|
||||
podRequest: GetResourceRequest(pod),
|
||||
}
|
||||
}
|
||||
|
||||
// GetPodAffinityMetadata computes inter-pod affinity metadata.
|
||||
func GetPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo, havePodsWithAffinityNodes []*schedulernodeinfo.NodeInfo) (*PodAffinityMetadata, error) {
|
||||
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
|
||||
@ -593,7 +551,6 @@ func (meta *predicateMetadata) ShallowCopy() Metadata {
|
||||
pod: meta.pod,
|
||||
}
|
||||
newPredMeta.serviceAffinityMetadata = meta.serviceAffinityMetadata.clone()
|
||||
newPredMeta.podFitsResourcesMetadata = meta.podFitsResourcesMetadata.clone()
|
||||
return (Metadata)(newPredMeta)
|
||||
}
|
||||
|
||||
|
@ -228,13 +228,6 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
||||
Namespace: "testns",
|
||||
},
|
||||
},
|
||||
podFitsResourcesMetadata: &podFitsResourcesMetadata{
|
||||
podRequest: &schedulernodeinfo.Resource{
|
||||
MilliCPU: 1000,
|
||||
Memory: 300,
|
||||
AllowedPodNumber: 4,
|
||||
},
|
||||
},
|
||||
serviceAffinityMetadata: &serviceAffinityMetadata{
|
||||
matchingPodList: []*v1.Pod{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},
|
||||
|
@ -780,10 +780,16 @@ func podName(pod *v1.Pod) string {
|
||||
return pod.Namespace + "/" + pod.Name
|
||||
}
|
||||
|
||||
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
|
||||
// PodFitsResources is a wrapper around PodFitsResourcesPredicate that implements FitPredicate interface.
|
||||
// TODO(#85822): remove this function once predicate registration logic is deleted.
|
||||
func PodFitsResources(pod *v1.Pod, _ Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
return PodFitsResourcesPredicate(pod, nil, nil, nodeInfo)
|
||||
}
|
||||
|
||||
// PodFitsResourcesPredicate checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
|
||||
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
|
||||
// predicate failure reasons if the node has insufficient resources to run the pod.
|
||||
func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
// predicate failure reasons if the node has insufficient resources to run the pod
|
||||
func PodFitsResourcesPredicate(pod *v1.Pod, podRequest *schedulernodeinfo.Resource, ignoredExtendedResources sets.String, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
node := nodeInfo.Node()
|
||||
if node == nil {
|
||||
return false, nil, fmt.Errorf("node not found")
|
||||
@ -795,17 +801,11 @@ func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.No
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
|
||||
}
|
||||
|
||||
// No extended resources should be ignored by default.
|
||||
ignoredExtendedResources := sets.NewString()
|
||||
if ignoredExtendedResources == nil {
|
||||
ignoredExtendedResources = sets.NewString()
|
||||
}
|
||||
|
||||
var podRequest *schedulernodeinfo.Resource
|
||||
if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {
|
||||
podRequest = predicateMeta.podFitsResourcesMetadata.podRequest
|
||||
if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {
|
||||
ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources
|
||||
}
|
||||
} else {
|
||||
// We couldn't parse metadata - fallback to computing it.
|
||||
if podRequest == nil {
|
||||
podRequest = GetResourceRequest(pod)
|
||||
}
|
||||
if podRequest.MilliCPU == 0 &&
|
||||
@ -839,13 +839,11 @@ func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.No
|
||||
}
|
||||
}
|
||||
|
||||
if klog.V(10) {
|
||||
if len(predicateFails) == 0 {
|
||||
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
|
||||
// not logged. There is visible performance gain from it.
|
||||
klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
|
||||
podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
|
||||
}
|
||||
if klog.V(10) && len(predicateFails) == 0 {
|
||||
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
|
||||
// not logged. There is visible performance gain from it.
|
||||
klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
|
||||
podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
|
||||
}
|
||||
return len(predicateFails) == 0, predicateFails, nil
|
||||
}
|
||||
@ -1144,43 +1142,11 @@ func haveOverlap(a1, a2 []string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// GeneralPredicates checks whether noncriticalPredicates and EssentialPredicates pass. noncriticalPredicates are the predicates
|
||||
// that only non-critical pods need and EssentialPredicates are the predicates that all pods, including critical pods, need.
|
||||
// GeneralPredicates checks a group of predicates that the kubelet cares about.
|
||||
// DEPRECATED: this exist only because kubelet uses it. We should change kubelet to execute the individual predicates it requires.
|
||||
func GeneralPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
var predicateFails []PredicateFailureReason
|
||||
for _, predicate := range []FitPredicate{noncriticalPredicates, EssentialPredicates} {
|
||||
fit, reasons, err := predicate(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
}
|
||||
if !fit {
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
}
|
||||
|
||||
return len(predicateFails) == 0, predicateFails, nil
|
||||
}
|
||||
|
||||
// noncriticalPredicates are the predicates that only non-critical pods need.
|
||||
func noncriticalPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
var predicateFails []PredicateFailureReason
|
||||
fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
}
|
||||
if !fit {
|
||||
predicateFails = append(predicateFails, reasons...)
|
||||
}
|
||||
|
||||
return len(predicateFails) == 0, predicateFails, nil
|
||||
}
|
||||
|
||||
// EssentialPredicates are the predicates that all pods, including critical pods, need.
|
||||
func EssentialPredicates(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
var predicateFails []PredicateFailureReason
|
||||
// TODO: PodFitsHostPorts is essential for now, but kubelet should ideally
|
||||
// preempt pods to free up host ports too
|
||||
for _, predicate := range []FitPredicate{PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector} {
|
||||
for _, predicate := range []FitPredicate{PodFitsResources, PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector} {
|
||||
fit, reasons, err := predicate(pod, meta, nodeInfo)
|
||||
if err != nil {
|
||||
return false, predicateFails, err
|
||||
|
@ -388,10 +388,7 @@ func TestPodFitsResources(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
RegisterPredicateMetadataProducerWithExtendedResourceOptions(test.ignoredExtendedResources)
|
||||
factory := &MetadataProducerFactory{}
|
||||
meta := factory.GetPredicateMetadata(test.pod, nil)
|
||||
fits, reasons, err := PodFitsResources(test.pod, meta, test.nodeInfo)
|
||||
fits, reasons, err := PodFitsResourcesPredicate(test.pod, GetResourceRequest(test.pod), test.ignoredExtendedResources, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -448,8 +445,8 @@ func TestPodFitsResources(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1, 0, 0, 0)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := PodFitsResources(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
fits, reasons, err := PodFitsResourcesPredicate(test.pod, GetResourceRequest(test.pod), nil, test.nodeInfo)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -509,8 +506,7 @@ func TestPodFitsResources(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := PodFitsResources(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
fits, reasons, err := PodFitsResourcesPredicate(test.pod, GetResourceRequest(test.pod), nil, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -44,6 +44,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
|
||||
@ -177,8 +178,8 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler,
|
||||
|
||||
var extenders []algorithm.SchedulerExtender
|
||||
if len(policy.Extenders) != 0 {
|
||||
ignoredExtendedResources := sets.NewString()
|
||||
var ignorableExtenders []algorithm.SchedulerExtender
|
||||
var ignoredExtendedResources []string
|
||||
for ii := range policy.Extenders {
|
||||
klog.V(2).Infof("Creating extender with config %+v", policy.Extenders[ii])
|
||||
extender, err := core.NewHTTPExtender(&policy.Extenders[ii])
|
||||
@ -192,13 +193,15 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler,
|
||||
}
|
||||
for _, r := range policy.Extenders[ii].ManagedResources {
|
||||
if r.IgnoredByScheduler {
|
||||
ignoredExtendedResources.Insert(string(r.Name))
|
||||
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
c.configProducerArgs.NodeResourcesFitArgs = &noderesources.FitArgs{
|
||||
IgnoredResources: ignoredExtendedResources,
|
||||
}
|
||||
// place ignorable extenders to the tail of extenders
|
||||
extenders = append(extenders, ignorableExtenders...)
|
||||
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
|
||||
}
|
||||
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
|
||||
// Give it higher precedence than scheduler CLI configuration when it is provided.
|
||||
|
@ -99,6 +99,8 @@ type ConfigProducerArgs struct {
|
||||
RequestedToCapacityRatioArgs *requestedtocapacityratio.Args
|
||||
// ServiceAffinityArgs is the args for the ServiceAffinity plugin.
|
||||
ServiceAffinityArgs *serviceaffinity.Args
|
||||
// NodeResourcesFitArgs is the args for the NodeResources fit filter.
|
||||
NodeResourcesFitArgs *noderesources.FitArgs
|
||||
}
|
||||
|
||||
// ConfigProducer produces a framework's configuration.
|
||||
@ -119,9 +121,10 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
|
||||
}
|
||||
// Register Predicates.
|
||||
registry.RegisterPredicate(predicates.GeneralPred,
|
||||
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
|
||||
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
|
||||
// GeneralPredicate is a combination of predicates.
|
||||
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
|
||||
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
|
||||
plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
|
||||
plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
|
||||
plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
|
||||
@ -133,8 +136,9 @@ func NewDefaultConfigProducerRegistry() *ConfigProducerRegistry {
|
||||
return
|
||||
})
|
||||
registry.RegisterPredicate(predicates.PodFitsResourcesPred,
|
||||
func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
|
||||
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
|
||||
plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
|
||||
pluginConfig = append(pluginConfig, makePluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
|
||||
return
|
||||
})
|
||||
registry.RegisterPredicate(predicates.HostNamePred,
|
||||
|
@ -21,6 +21,8 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@ -51,14 +53,13 @@ go_test(
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/framework/plugins/migration:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
],
|
||||
|
@ -20,38 +20,104 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
)
|
||||
|
||||
// Fit is a plugin that checks if a node has sufficient resources.
|
||||
type Fit struct{}
|
||||
|
||||
var _ framework.PreFilterPlugin = &Fit{}
|
||||
var _ framework.FilterPlugin = &Fit{}
|
||||
|
||||
// FitName is the name of the plugin used in the plugin registry and configurations.
|
||||
const FitName = "NodeResourcesFit"
|
||||
const (
|
||||
// FitName is the name of the plugin used in the plugin registry and configurations.
|
||||
FitName = "NodeResourcesFit"
|
||||
|
||||
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data.
|
||||
// Using the name of the plugin will likely help us avoid collisions with other plugins.
|
||||
preFilterStateKey = "PreFilter" + FitName
|
||||
)
|
||||
|
||||
// Fit is a plugin that checks if a node has sufficient resources.
|
||||
type Fit struct {
|
||||
ignoredResources sets.String
|
||||
}
|
||||
|
||||
// FitArgs holds the args that are used to configure the plugin.
|
||||
type FitArgs struct {
|
||||
// IgnoredResources is the list of resources that NodeResources fit filter
|
||||
// should ignore.
|
||||
IgnoredResources []string `json:"IgnoredResources,omitempty"`
|
||||
}
|
||||
|
||||
// preFilterState computed at PreFilter and used at Filter.
|
||||
type preFilterState struct {
|
||||
podResourceRequest *nodeinfo.Resource
|
||||
}
|
||||
|
||||
// Clone the prefilter state.
|
||||
func (s *preFilterState) Clone() framework.StateData {
|
||||
return s
|
||||
}
|
||||
|
||||
// Name returns name of the plugin. It is used in logs, etc.
|
||||
func (f *Fit) Name() string {
|
||||
return FitName
|
||||
}
|
||||
|
||||
// PreFilter invoked at the prefilter extension point.
|
||||
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
s := &preFilterState{
|
||||
podResourceRequest: predicates.GetResourceRequest(pod),
|
||||
}
|
||||
cycleState.Write(preFilterStateKey, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreFilterExtensions returns prefilter extensions, pod add and remove.
|
||||
func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPodResourceRequest(cycleState *framework.CycleState) (*nodeinfo.Resource, error) {
|
||||
c, err := cycleState.Read(preFilterStateKey)
|
||||
if err != nil {
|
||||
// The metadata wasn't pre-computed in prefilter. We ignore the error for now since
|
||||
// Filter is able to handle that by computing it again.
|
||||
klog.Errorf("reading %q from cycleState: %v", preFilterStateKey, err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
s, ok := c.(*preFilterState)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c)
|
||||
}
|
||||
return s.podResourceRequest, nil
|
||||
}
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
meta, ok := migration.CovertStateRefToPredMeta(migration.PredicateMetadata(cycleState))
|
||||
if !ok {
|
||||
return migration.ErrorToFrameworkStatus(fmt.Errorf("%+v convert to predicates.Metadata error", cycleState))
|
||||
r, err := getPodResourceRequest(cycleState)
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
_, reasons, err := predicates.PodFitsResources(pod, meta, nodeInfo)
|
||||
_, reasons, err := predicates.PodFitsResourcesPredicate(pod, r, f.ignoredResources, nodeInfo)
|
||||
return migration.PredicateResultToFrameworkStatus(reasons, err)
|
||||
}
|
||||
|
||||
// NewFit initializes a new plugin and returns it.
|
||||
func NewFit(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
return &Fit{}, nil
|
||||
func NewFit(plArgs *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||
args := &FitArgs{}
|
||||
if err := framework.DecodeInto(plArgs, args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fit := &Fit{}
|
||||
fit.ignoredResources = sets.NewString(args.IgnoredResources...)
|
||||
return fit, nil
|
||||
}
|
||||
|
@ -18,18 +18,17 @@ package noderesources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
)
|
||||
@ -93,11 +92,12 @@ func TestNodeResourcesFit(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodOverhead, true)()
|
||||
|
||||
enoughPodsTests := []struct {
|
||||
pod *v1.Pod
|
||||
nodeInfo *schedulernodeinfo.NodeInfo
|
||||
name string
|
||||
ignoredExtendedResources sets.String
|
||||
wantStatus *framework.Status
|
||||
pod *v1.Pod
|
||||
nodeInfo *schedulernodeinfo.NodeInfo
|
||||
name string
|
||||
ignoredResources []byte
|
||||
preFilterDisabled bool
|
||||
wantStatus *framework.Status
|
||||
}{
|
||||
{
|
||||
pod: &v1.Pod{},
|
||||
@ -116,6 +116,18 @@ func TestNodeResourcesFit(t *testing.T) {
|
||||
predicates.NewInsufficientResourceError(v1.ResourceMemory, 2, 10, 10).GetReason(),
|
||||
),
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1}),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 10, Memory: 20})),
|
||||
name: "without prefilter",
|
||||
preFilterDisabled: true,
|
||||
wantStatus: framework.NewStatus(
|
||||
framework.Unschedulable,
|
||||
predicates.NewInsufficientResourceError(v1.ResourceCPU, 2, 10, 10).GetReason(),
|
||||
predicates.NewInsufficientResourceError(v1.ResourceMemory, 2, 10, 10).GetReason(),
|
||||
),
|
||||
},
|
||||
{
|
||||
pod: newResourceInitPod(newResourcePod(schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1}), schedulernodeinfo.Resource{MilliCPU: 3, Memory: 1}),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
@ -318,9 +330,8 @@ func TestNodeResourcesFit(t *testing.T) {
|
||||
schedulernodeinfo.Resource{MilliCPU: 1, Memory: 1, ScalarResources: map[v1.ResourceName]int64{extendedResourceB: 1}}),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 0, Memory: 0})),
|
||||
ignoredExtendedResources: sets.NewString(string(extendedResourceB)),
|
||||
name: "skip checking ignored extended resource",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, predicates.NewInsufficientResourceError(extendedResourceB, 2, 10, 10).GetReason()),
|
||||
ignoredResources: []byte(`{"IgnoredResources" : ["example.com/bbb"]}`),
|
||||
name: "skip checking ignored extended resource",
|
||||
},
|
||||
{
|
||||
pod: newResourceOverheadPod(
|
||||
@ -329,8 +340,7 @@ func TestNodeResourcesFit(t *testing.T) {
|
||||
),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 5, Memory: 5})),
|
||||
ignoredExtendedResources: sets.NewString(string(extendedResourceB)),
|
||||
name: "resources + pod overhead fits",
|
||||
name: "resources + pod overhead fits",
|
||||
},
|
||||
{
|
||||
pod: newResourceOverheadPod(
|
||||
@ -339,24 +349,27 @@ func TestNodeResourcesFit(t *testing.T) {
|
||||
),
|
||||
nodeInfo: schedulernodeinfo.NewNodeInfo(
|
||||
newResourcePod(schedulernodeinfo.Resource{MilliCPU: 5, Memory: 5})),
|
||||
ignoredExtendedResources: sets.NewString(string(extendedResourceB)),
|
||||
name: "requests + overhead does not fit for memory",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, predicates.NewInsufficientResourceError(v1.ResourceMemory, 16, 5, 20).GetReason()),
|
||||
name: "requests + overhead does not fit for memory",
|
||||
wantStatus: framework.NewStatus(framework.Unschedulable, predicates.NewInsufficientResourceError(v1.ResourceMemory, 16, 5, 20).GetReason()),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range enoughPodsTests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &predicates.MetadataProducerFactory{}
|
||||
meta := factory.GetPredicateMetadata(test.pod, nil)
|
||||
state := framework.NewCycleState()
|
||||
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
|
||||
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
|
||||
p, _ := NewFit(nil, nil)
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo)
|
||||
args := &runtime.Unknown{Raw: test.ignoredResources}
|
||||
p, _ := NewFit(args, nil)
|
||||
cycleState := framework.NewCycleState()
|
||||
if !test.preFilterDisabled {
|
||||
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
||||
}
|
||||
}
|
||||
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
}
|
||||
@ -401,16 +414,17 @@ func TestNodeResourcesFit(t *testing.T) {
|
||||
}
|
||||
for _, test := range notEnoughPodsTests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &predicates.MetadataProducerFactory{}
|
||||
meta := factory.GetPredicateMetadata(test.pod, nil)
|
||||
state := framework.NewCycleState()
|
||||
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
|
||||
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1, 0, 0, 0)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
|
||||
p, _ := NewFit(nil, nil)
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo)
|
||||
cycleState := framework.NewCycleState()
|
||||
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
||||
}
|
||||
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
}
|
||||
@ -453,16 +467,17 @@ func TestNodeResourcesFit(t *testing.T) {
|
||||
|
||||
for _, test := range storagePodsTests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &predicates.MetadataProducerFactory{}
|
||||
meta := factory.GetPredicateMetadata(test.pod, nil)
|
||||
state := framework.NewCycleState()
|
||||
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
|
||||
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
|
||||
p, _ := NewFit(nil, nil)
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), state, test.pod, test.nodeInfo)
|
||||
cycleState := framework.NewCycleState()
|
||||
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
||||
}
|
||||
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Pl
|
||||
|
||||
// DecodeInto decodes configuration whose type is *runtime.Unknown to the interface into.
|
||||
func DecodeInto(configuration *runtime.Unknown, into interface{}) error {
|
||||
if configuration == nil {
|
||||
if configuration == nil || configuration.Raw == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user