From bf2b709e822c37288f1ce1470c4daf7a47b44edb Mon Sep 17 00:00:00 2001 From: zouyee Date: Wed, 25 Dec 2019 16:41:40 +0800 Subject: [PATCH] Move ResourceLimitsPriority to its Score plugin Signed-off-by: Zou Nengren --- pkg/scheduler/algorithm/priorities/BUILD | 2 - .../algorithm/priorities/metadata.go | 2 - .../algorithm/priorities/metadata_test.go | 13 -- .../algorithm/priorities/resource_limits.go | 105 ----------- .../algorithmprovider/defaults/defaults.go | 4 +- .../apis/config/testing/compatibility_test.go | 27 +++ .../framework/plugins/noderesources/BUILD | 2 + .../plugins/noderesources/most_allocated.go | 4 +- .../plugins/noderesources/resource_limits.go | 165 ++++++++++++++++++ .../noderesources}/resource_limits_test.go | 28 ++- pkg/scheduler/framework/plugins/registry.go | 7 + 11 files changed, 218 insertions(+), 141 deletions(-) delete mode 100644 pkg/scheduler/algorithm/priorities/resource_limits.go create mode 100644 pkg/scheduler/framework/plugins/noderesources/resource_limits.go rename pkg/scheduler/{algorithm/priorities => framework/plugins/noderesources}/resource_limits_test.go (86%) diff --git a/pkg/scheduler/algorithm/priorities/BUILD b/pkg/scheduler/algorithm/priorities/BUILD index 277f93b71bb..59c90a13b22 100644 --- a/pkg/scheduler/algorithm/priorities/BUILD +++ b/pkg/scheduler/algorithm/priorities/BUILD @@ -19,7 +19,6 @@ go_library( "reduce.go", "requested_to_capacity_ratio.go", "resource_allocation.go", - "resource_limits.go", "selector_spreading.go", "test_util.go", "types.go", @@ -56,7 +55,6 @@ go_test( "most_requested_test.go", "node_prefer_avoid_pods_test.go", "requested_to_capacity_ratio_test.go", - "resource_limits_test.go", "selector_spreading_test.go", "spreading_perf_test.go", "types_test.go", diff --git a/pkg/scheduler/algorithm/priorities/metadata.go b/pkg/scheduler/algorithm/priorities/metadata.go index 5195f15a9f1..99f953dcd7a 100644 --- a/pkg/scheduler/algorithm/priorities/metadata.go +++ b/pkg/scheduler/algorithm/priorities/metadata.go @@ -56,7 +56,6 @@ func NewMetadataFactory( // priorityMetadata is a type that is passed as metadata for priority functions type priorityMetadata struct { - podLimits *schedulernodeinfo.Resource podSelector labels.Selector controllerRef *metav1.OwnerReference podFirstServiceSelector labels.Selector @@ -85,7 +84,6 @@ func (pmf *MetadataFactory) PriorityMetadata( return nil } return &priorityMetadata{ - podLimits: getResourceLimits(pod), podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister), controllerRef: metav1.GetControllerOf(pod), podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister), diff --git a/pkg/scheduler/algorithm/priorities/metadata_test.go b/pkg/scheduler/algorithm/priorities/metadata_test.go index a5cfa986aad..4cf154e6fb1 100644 --- a/pkg/scheduler/algorithm/priorities/metadata_test.go +++ b/pkg/scheduler/algorithm/priorities/metadata_test.go @@ -35,16 +35,6 @@ func TestPriorityMetadata(t *testing.T) { nonZeroReqs.MilliCPU = priorityutil.DefaultMilliCPURequest nonZeroReqs.Memory = priorityutil.DefaultMemoryRequest - specifiedReqs := &schedulernodeinfo.Resource{} - specifiedReqs.MilliCPU = 200 - specifiedReqs.Memory = 2000 - - nonPodLimits := &schedulernodeinfo.Resource{} - - specifiedPodLimits := &schedulernodeinfo.Resource{} - specifiedPodLimits.MilliCPU = 200 - specifiedPodLimits.Memory = 2000 - tolerations := []v1.Toleration{{ Key: "foo", Operator: v1.TolerationOpEqual, @@ -138,7 +128,6 @@ func TestPriorityMetadata(t *testing.T) { { pod: podWithTolerationsAndAffinity, expected: &priorityMetadata{ - podLimits: nonPodLimits, podSelector: labels.NewSelector(), }, name: "Produce a priorityMetadata with default requests", @@ -146,7 +135,6 @@ func TestPriorityMetadata(t *testing.T) { { pod: podWithTolerationsAndRequests, expected: &priorityMetadata{ - podLimits: nonPodLimits, podSelector: labels.NewSelector(), }, name: "Produce a priorityMetadata with tolerations and requests", @@ -154,7 +142,6 @@ func TestPriorityMetadata(t *testing.T) { { pod: podWithAffinityAndRequests, expected: &priorityMetadata{ - podLimits: specifiedPodLimits, podSelector: labels.NewSelector(), }, name: "Produce a priorityMetadata with affinity and requests", diff --git a/pkg/scheduler/algorithm/priorities/resource_limits.go b/pkg/scheduler/algorithm/priorities/resource_limits.go deleted file mode 100644 index 7133f060dc7..00000000000 --- a/pkg/scheduler/algorithm/priorities/resource_limits.go +++ /dev/null @@ -1,105 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package priorities - -import ( - "fmt" - - v1 "k8s.io/api/core/v1" - framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - - "k8s.io/klog" -) - -// ResourceLimitsPriorityMap is a priority function that increases score of input node by 1 if the node satisfies -// input pod's resource limits. In detail, this priority function works as follows: If a node does not publish its -// allocatable resources (cpu and memory both), the node score is not affected. If a pod does not specify -// its cpu and memory limits both, the node score is not affected. If one or both of cpu and memory limits -// of the pod are satisfied, the node is assigned a score of 1. -// Rationale of choosing the lowest score of 1 is that this is mainly selected to break ties between nodes that have -// same scores assigned by one of least and most requested priority functions. -func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) { - node := nodeInfo.Node() - if node == nil { - return framework.NodeScore{}, fmt.Errorf("node not found") - } - - allocatableResources := nodeInfo.AllocatableResource() - - // compute pod limits - var podLimits *schedulernodeinfo.Resource - if priorityMeta, ok := meta.(*priorityMetadata); ok && priorityMeta != nil { - // We were able to parse metadata, use podLimits from there. - podLimits = priorityMeta.podLimits - } else { - // We couldn't parse metadata - fallback to computing it. - podLimits = getResourceLimits(pod) - } - - cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU) - memScore := computeScore(podLimits.Memory, allocatableResources.Memory) - - score := int64(0) - if cpuScore == 1 || memScore == 1 { - score = 1 - } - - if klog.V(10) { - // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is - // not logged. There is visible performance gain from it. - klog.Infof( - "%v -> %v: Resource Limits Priority, allocatable %d millicores %d memory bytes, pod limits %d millicores %d memory bytes, score %d", - pod.Name, node.Name, - allocatableResources.MilliCPU, allocatableResources.Memory, - podLimits.MilliCPU, podLimits.Memory, - score, - ) - } - - return framework.NodeScore{ - Name: node.Name, - Score: score, - }, nil -} - -// computeScore returns 1 if limit value is less than or equal to allocatable -// value, otherwise it returns 0. -func computeScore(limit, allocatable int64) int64 { - if limit != 0 && allocatable != 0 && limit <= allocatable { - return 1 - } - return 0 -} - -// getResourceLimits computes resource limits for input pod. -// The reason to create this new function is to be consistent with other -// priority functions because most or perhaps all priority functions work -// with schedulernodeinfo.Resource. -func getResourceLimits(pod *v1.Pod) *schedulernodeinfo.Resource { - result := &schedulernodeinfo.Resource{} - for _, container := range pod.Spec.Containers { - result.Add(container.Resources.Limits) - } - - // take max_resource(sum_pod, any_init_container) - for _, container := range pod.Spec.InitContainers { - result.SetMaxResource(container.Resources.Limits) - } - - return result -} diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults.go b/pkg/scheduler/algorithmprovider/defaults/defaults.go index b95b5a45da5..8f6d0f665f9 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -81,9 +81,9 @@ func ApplyFeatureGates() (restore func()) { // Prioritizes nodes that satisfy pod's resource limits if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) { klog.Infof("Registering resourcelimits priority function") - scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, priorities.ResourceLimitsPriorityMap, nil, 1) + scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, nil, nil, 1) // Register the priority function to specific provider too. - scheduler.InsertPriorityKeyToAlgorithmProviderMap(scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, priorities.ResourceLimitsPriorityMap, nil, 1)) + scheduler.InsertPriorityKeyToAlgorithmProviderMap(scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, nil, nil, 1)) } restore = func() { diff --git a/pkg/scheduler/apis/config/testing/compatibility_test.go b/pkg/scheduler/apis/config/testing/compatibility_test.go index 75a0d1b1c6b..a9f9ac0f737 100644 --- a/pkg/scheduler/apis/config/testing/compatibility_test.go +++ b/pkg/scheduler/apis/config/testing/compatibility_test.go @@ -1292,6 +1292,32 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { }, }, }, + { + name: "enable alpha feature ResourceLimitsPriorityFunction", + JSON: `{ + "kind": "Policy", + "apiVersion": "v1", + "predicates": [], + "priorities": [ + {"name": "ResourceLimitsPriority", "weight": 2} + ] + }`, + featureGates: map[featuregate.Feature]bool{ + features.ResourceLimitsPriorityFunction: true, + }, + wantPlugins: map[string][]config.Plugin{ + "PostFilterPlugin": { + {Name: "NodeResourceLimits"}, + }, + "FilterPlugin": { + {Name: "NodeUnschedulable"}, + {Name: "TaintToleration"}, + }, + "ScorePlugin": { + {Name: "NodeResourceLimits", Weight: 2}, + }, + }, + }, } registeredPredicates := sets.NewString(scheduler.ListRegisteredFitPredicates()...) seenPredicates := sets.NewString() @@ -1329,6 +1355,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { "RequestedToCapacityRatio": "RequestedToCapacityRatioPriority", "NodeLabel": "TestLabelPreference", "ServiceAffinity": "TestServiceAntiAffinity", + "ResourceLimitsPriority": "NodeResourceLimits", } for _, tc := range testcases { diff --git a/pkg/scheduler/framework/plugins/noderesources/BUILD b/pkg/scheduler/framework/plugins/noderesources/BUILD index 8d5af0b1847..920dd7a5e31 100644 --- a/pkg/scheduler/framework/plugins/noderesources/BUILD +++ b/pkg/scheduler/framework/plugins/noderesources/BUILD @@ -8,6 +8,7 @@ go_library( "least_allocated.go", "most_allocated.go", "requested_to_capacity_ratio.go", + "resource_limits.go", "test_util.go", ], importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources", @@ -49,6 +50,7 @@ go_test( "least_allocated_test.go", "most_allocated_test.go", "requested_to_capacity_ratio_test.go", + "resource_limits_test.go", ], embed = [":go_default_library"], deps = [ diff --git a/pkg/scheduler/framework/plugins/noderesources/most_allocated.go b/pkg/scheduler/framework/plugins/noderesources/most_allocated.go index b4ef4d337c6..1be1cd8e4f2 100644 --- a/pkg/scheduler/framework/plugins/noderesources/most_allocated.go +++ b/pkg/scheduler/framework/plugins/noderesources/most_allocated.go @@ -45,8 +45,8 @@ func (ma *MostAllocated) Name() string { // Score invoked at the Score extension point. func (ma *MostAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, err := ma.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) - if err != nil { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + if err != nil || nodeInfo.Node() == nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil)) } // MostRequestedPriorityMap does not use priority metadata, hence we pass nil here diff --git a/pkg/scheduler/framework/plugins/noderesources/resource_limits.go b/pkg/scheduler/framework/plugins/noderesources/resource_limits.go new file mode 100644 index 00000000000..435c659410a --- /dev/null +++ b/pkg/scheduler/framework/plugins/noderesources/resource_limits.go @@ -0,0 +1,165 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesources + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog" + framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" + "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +// ResourceLimits is a score plugin that increases score of input node by 1 if the node satisfies +// input pod's resource limits +type ResourceLimits struct { + handle framework.FrameworkHandle +} + +var _ = framework.PostFilterPlugin(&ResourceLimits{}) +var _ = framework.ScorePlugin(&ResourceLimits{}) + +const ( + // ResourceLimitsName is the name of the plugin used in the plugin registry and configurations. + ResourceLimitsName = "NodeResourceLimits" + + // postFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data. + // Using the name of the plugin will likely help us avoid collisions with other plugins. + postFilterStateKey = "PostFilter" + ResourceLimitsName +) + +// postFilterState computed at PostFilter and used at Score. +type postFilterState struct { + podResourceRequest *schedulernodeinfo.Resource +} + +// Clone the postFilter state. +func (s *postFilterState) Clone() framework.StateData { + return s +} + +// Name returns name of the plugin. It is used in logs, etc. +func (rl *ResourceLimits) Name() string { + return ResourceLimitsName +} + +// PostFilter builds and writes cycle state used by Score and NormalizeScore. +func (rl *ResourceLimits) PostFilter( + pCtx context.Context, + cycleState *framework.CycleState, + pod *v1.Pod, + nodes []*v1.Node, + _ framework.NodeToStatusMap, +) *framework.Status { + if len(nodes) == 0 { + // No nodes to score. + return nil + } + + if rl.handle.SnapshotSharedLister() == nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister")) + } + s := &postFilterState{ + podResourceRequest: getResourceLimits(pod), + } + cycleState.Write(postFilterStateKey, s) + return nil +} + +func getPodResource(cycleState *framework.CycleState) (*nodeinfo.Resource, error) { + c, err := cycleState.Read(postFilterStateKey) + if err != nil { + klog.V(5).Infof("Error reading %q from cycleState: %v", postFilterStateKey, err) + return nil, nil + } + + s, ok := c.(*postFilterState) + if !ok { + return nil, fmt.Errorf("%+v convert to ResourceLimits.postFilterState error", c) + } + return s.podResourceRequest, nil +} + +// Score invoked at the Score extension point. +// The "score" returned in this function is the matching number of pods on the `nodeName`. +// Currently works as follows: +// If a node does not publish its allocatable resources (cpu and memory both), the node score is not affected. +// If a pod does not specify its cpu and memory limits both, the node score is not affected. +// If one or both of cpu and memory limits of the pod are satisfied, the node is assigned a score of 1. +// Rationale of choosing the lowest score of 1 is that this is mainly selected to break ties between nodes that have +// same scores assigned by one of least and most requested priority functions. +func (rl *ResourceLimits) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := rl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil || nodeInfo.Node() == nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil)) + } + allocatableResources := nodeInfo.AllocatableResource() + podLimits, err := getPodResource(state) + if err != nil { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + + cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU) + memScore := computeScore(podLimits.Memory, allocatableResources.Memory) + + score := int64(0) + if cpuScore == 1 || memScore == 1 { + score = 1 + } + return score, nil +} + +// ScoreExtensions of the Score plugin. +func (rl *ResourceLimits) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +// NewResourceLimits initializes a new plugin and returns it. +func NewResourceLimits(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) { + return &ResourceLimits{handle: h}, nil +} + +// getResourceLimits computes resource limits for input pod. +// The reason to create this new function is to be consistent with other +// priority functions because most or perhaps all priority functions work +// with schedulernodeinfo.Resource. +func getResourceLimits(pod *v1.Pod) *schedulernodeinfo.Resource { + result := &schedulernodeinfo.Resource{} + for _, container := range pod.Spec.Containers { + result.Add(container.Resources.Limits) + } + + // take max_resource(sum_pod, any_init_container) + for _, container := range pod.Spec.InitContainers { + result.SetMaxResource(container.Resources.Limits) + } + + return result +} + +// computeScore returns 1 if limit value is less than or equal to allocatable +// value, otherwise it returns 0. +func computeScore(limit, allocatable int64) int64 { + if limit != 0 && allocatable != 0 && limit <= allocatable { + return 1 + } + return 0 +} diff --git a/pkg/scheduler/algorithm/priorities/resource_limits_test.go b/pkg/scheduler/framework/plugins/noderesources/resource_limits_test.go similarity index 86% rename from pkg/scheduler/algorithm/priorities/resource_limits_test.go rename to pkg/scheduler/framework/plugins/noderesources/resource_limits_test.go index 22273bae529..778fa3b192b 100644 --- a/pkg/scheduler/algorithm/priorities/resource_limits_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/resource_limits_test.go @@ -14,9 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ -package priorities +package noderesources import ( + "context" "reflect" "testing" @@ -26,7 +27,7 @@ import ( nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot" ) -func TestResourceLimitsPriority(t *testing.T) { +func TestResourceLimits(t *testing.T) { noResources := v1.PodSpec{ Containers: []v1.Container{}, } @@ -139,23 +140,20 @@ func TestResourceLimitsPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(nil, test.nodes)) - metadata := &priorityMetadata{ - podLimits: getResourceLimits(test.pod), + fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot)) + p := &ResourceLimits{handle: fh} + state := framework.NewCycleState() + status := p.PostFilter(context.Background(), state, test.pod, test.nodes, nil) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) } - - for _, hasMeta := range []bool{true, false} { - meta := metadata - if !hasMeta { - meta = nil - } - - list, err := runMapReducePriority(ResourceLimitsPriorityMap, nil, meta, test.pod, snapshot, test.nodes) - + for i := range test.nodes { + hostResult, err := p.Score(context.Background(), state, test.pod, test.nodes[i].Name) if err != nil { t.Errorf("unexpected error: %v", err) } - if !reflect.DeepEqual(test.expectedList, list) { - t.Errorf("hasMeta %#v expected %#v, got %#v", hasMeta, test.expectedList, list) + if !reflect.DeepEqual(test.expectedList[i].Score, hostResult) { + t.Errorf("expected %#v, got %#v", test.expectedList[i].Score, hostResult) } } }) diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index dbd172108a7..8a4ac570042 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -70,6 +70,7 @@ func NewInTreeRegistry(args *RegistryArgs) framework.Registry { noderesources.MostAllocatedName: noderesources.NewMostAllocated, noderesources.LeastAllocatedName: noderesources.NewLeastAllocated, noderesources.RequestedToCapacityRatioName: noderesources.NewRequestedToCapacityRatio, + noderesources.ResourceLimitsName: noderesources.NewResourceLimits, volumebinding.Name: func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) { return volumebinding.NewFromVolumeBinder(args.VolumeBinder), nil }, @@ -305,6 +306,12 @@ func NewConfigProducerRegistry() *ConfigProducerRegistry { pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs)) return }) + registry.RegisterPriority(priorities.ResourceLimitsPriority, + func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) { + plugins.PostFilter = appendToPluginSet(plugins.PostFilter, noderesources.ResourceLimitsName, nil) + plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight) + return + }) return registry }