Merge pull request #86580 from zouyee/rlp

Move ResourceLimitsPriority to its Score plugin
This commit is contained in:
Kubernetes Prow Robot 2019-12-25 07:39:38 -08:00 committed by GitHub
commit bd1195c28e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 218 additions and 141 deletions

View File

@ -18,7 +18,6 @@ go_library(
"reduce.go",
"requested_to_capacity_ratio.go",
"resource_allocation.go",
"resource_limits.go",
"selector_spreading.go",
"test_util.go",
"types.go",
@ -54,7 +53,6 @@ go_test(
"metadata_test.go",
"most_requested_test.go",
"requested_to_capacity_ratio_test.go",
"resource_limits_test.go",
"selector_spreading_test.go",
"spreading_perf_test.go",
"types_test.go",

View File

@ -56,7 +56,6 @@ func NewMetadataFactory(
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
podLimits *schedulernodeinfo.Resource
podSelector labels.Selector
podFirstServiceSelector labels.Selector
podTopologySpreadMap *podTopologySpreadMap
@ -84,7 +83,6 @@ func (pmf *MetadataFactory) PriorityMetadata(
return nil
}
return &priorityMetadata{
podLimits: getResourceLimits(pod),
podSelector: getSelector(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
podTopologySpreadMap: tpSpreadMap,

View File

@ -35,16 +35,6 @@ func TestPriorityMetadata(t *testing.T) {
nonZeroReqs.MilliCPU = priorityutil.DefaultMilliCPURequest
nonZeroReqs.Memory = priorityutil.DefaultMemoryRequest
specifiedReqs := &schedulernodeinfo.Resource{}
specifiedReqs.MilliCPU = 200
specifiedReqs.Memory = 2000
nonPodLimits := &schedulernodeinfo.Resource{}
specifiedPodLimits := &schedulernodeinfo.Resource{}
specifiedPodLimits.MilliCPU = 200
specifiedPodLimits.Memory = 2000
tolerations := []v1.Toleration{{
Key: "foo",
Operator: v1.TolerationOpEqual,
@ -138,7 +128,6 @@ func TestPriorityMetadata(t *testing.T) {
{
pod: podWithTolerationsAndAffinity,
expected: &priorityMetadata{
podLimits: nonPodLimits,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with default requests",
@ -146,7 +135,6 @@ func TestPriorityMetadata(t *testing.T) {
{
pod: podWithTolerationsAndRequests,
expected: &priorityMetadata{
podLimits: nonPodLimits,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with tolerations and requests",
@ -154,7 +142,6 @@ func TestPriorityMetadata(t *testing.T) {
{
pod: podWithAffinityAndRequests,
expected: &priorityMetadata{
podLimits: specifiedPodLimits,
podSelector: labels.NewSelector(),
},
name: "Produce a priorityMetadata with affinity and requests",

View File

@ -1,105 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"fmt"
v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/klog"
)
// ResourceLimitsPriorityMap is a priority function that increases score of input node by 1 if the node satisfies
// input pod's resource limits. In detail, this priority function works as follows: If a node does not publish its
// allocatable resources (cpu and memory both), the node score is not affected. If a pod does not specify
// its cpu and memory limits both, the node score is not affected. If one or both of cpu and memory limits
// of the pod are satisfied, the node is assigned a score of 1.
// Rationale of choosing the lowest score of 1 is that this is mainly selected to break ties between nodes that have
// same scores assigned by one of least and most requested priority functions.
func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
allocatableResources := nodeInfo.AllocatableResource()
// compute pod limits
var podLimits *schedulernodeinfo.Resource
if priorityMeta, ok := meta.(*priorityMetadata); ok && priorityMeta != nil {
// We were able to parse metadata, use podLimits from there.
podLimits = priorityMeta.podLimits
} else {
// We couldn't parse metadata - fallback to computing it.
podLimits = getResourceLimits(pod)
}
cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
memScore := computeScore(podLimits.Memory, allocatableResources.Memory)
score := int64(0)
if cpuScore == 1 || memScore == 1 {
score = 1
}
if klog.V(10) {
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
klog.Infof(
"%v -> %v: Resource Limits Priority, allocatable %d millicores %d memory bytes, pod limits %d millicores %d memory bytes, score %d",
pod.Name, node.Name,
allocatableResources.MilliCPU, allocatableResources.Memory,
podLimits.MilliCPU, podLimits.Memory,
score,
)
}
return framework.NodeScore{
Name: node.Name,
Score: score,
}, nil
}
// computeScore returns 1 if limit value is less than or equal to allocatable
// value, otherwise it returns 0.
func computeScore(limit, allocatable int64) int64 {
if limit != 0 && allocatable != 0 && limit <= allocatable {
return 1
}
return 0
}
// getResourceLimits computes resource limits for input pod.
// The reason to create this new function is to be consistent with other
// priority functions because most or perhaps all priority functions work
// with schedulernodeinfo.Resource.
func getResourceLimits(pod *v1.Pod) *schedulernodeinfo.Resource {
result := &schedulernodeinfo.Resource{}
for _, container := range pod.Spec.Containers {
result.Add(container.Resources.Limits)
}
// take max_resource(sum_pod, any_init_container)
for _, container := range pod.Spec.InitContainers {
result.SetMaxResource(container.Resources.Limits)
}
return result
}

View File

@ -81,9 +81,9 @@ func ApplyFeatureGates() (restore func()) {
// Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
klog.Infof("Registering resourcelimits priority function")
scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, priorities.ResourceLimitsPriorityMap, nil, 1)
scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, nil, nil, 1)
// Register the priority function to specific provider too.
scheduler.InsertPriorityKeyToAlgorithmProviderMap(scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, priorities.ResourceLimitsPriorityMap, nil, 1))
scheduler.InsertPriorityKeyToAlgorithmProviderMap(scheduler.RegisterPriorityMapReduceFunction(priorities.ResourceLimitsPriority, nil, nil, 1))
}
restore = func() {

View File

@ -1292,6 +1292,32 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
},
},
{
name: "enable alpha feature ResourceLimitsPriorityFunction",
JSON: `{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [],
"priorities": [
{"name": "ResourceLimitsPriority", "weight": 2}
]
}`,
featureGates: map[featuregate.Feature]bool{
features.ResourceLimitsPriorityFunction: true,
},
wantPlugins: map[string][]config.Plugin{
"PostFilterPlugin": {
{Name: "NodeResourceLimits"},
},
"FilterPlugin": {
{Name: "NodeUnschedulable"},
{Name: "TaintToleration"},
},
"ScorePlugin": {
{Name: "NodeResourceLimits", Weight: 2},
},
},
},
}
registeredPredicates := sets.NewString(scheduler.ListRegisteredFitPredicates()...)
seenPredicates := sets.NewString()
@ -1329,6 +1355,7 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
"RequestedToCapacityRatio": "RequestedToCapacityRatioPriority",
"NodeLabel": "TestLabelPreference",
"ServiceAffinity": "TestServiceAntiAffinity",
"ResourceLimitsPriority": "NodeResourceLimits",
}
for _, tc := range testcases {

View File

@ -8,6 +8,7 @@ go_library(
"least_allocated.go",
"most_allocated.go",
"requested_to_capacity_ratio.go",
"resource_limits.go",
"test_util.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources",
@ -49,6 +50,7 @@ go_test(
"least_allocated_test.go",
"most_allocated_test.go",
"requested_to_capacity_ratio_test.go",
"resource_limits_test.go",
],
embed = [":go_default_library"],
deps = [

View File

@ -45,8 +45,8 @@ func (ma *MostAllocated) Name() string {
// Score invoked at the Score extension point.
func (ma *MostAllocated) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := ma.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
if err != nil || nodeInfo.Node() == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
}
// MostRequestedPriorityMap does not use priority metadata, hence we pass nil here

View File

@ -0,0 +1,165 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package noderesources
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// ResourceLimits is a score plugin that increases score of input node by 1 if the node satisfies
// input pod's resource limits
type ResourceLimits struct {
handle framework.FrameworkHandle
}
var _ = framework.PostFilterPlugin(&ResourceLimits{})
var _ = framework.ScorePlugin(&ResourceLimits{})
const (
// ResourceLimitsName is the name of the plugin used in the plugin registry and configurations.
ResourceLimitsName = "NodeResourceLimits"
// postFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
postFilterStateKey = "PostFilter" + ResourceLimitsName
)
// postFilterState computed at PostFilter and used at Score.
type postFilterState struct {
podResourceRequest *schedulernodeinfo.Resource
}
// Clone the postFilter state.
func (s *postFilterState) Clone() framework.StateData {
return s
}
// Name returns name of the plugin. It is used in logs, etc.
func (rl *ResourceLimits) Name() string {
return ResourceLimitsName
}
// PostFilter builds and writes cycle state used by Score and NormalizeScore.
func (rl *ResourceLimits) PostFilter(
pCtx context.Context,
cycleState *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
_ framework.NodeToStatusMap,
) *framework.Status {
if len(nodes) == 0 {
// No nodes to score.
return nil
}
if rl.handle.SnapshotSharedLister() == nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("empty shared lister"))
}
s := &postFilterState{
podResourceRequest: getResourceLimits(pod),
}
cycleState.Write(postFilterStateKey, s)
return nil
}
func getPodResource(cycleState *framework.CycleState) (*nodeinfo.Resource, error) {
c, err := cycleState.Read(postFilterStateKey)
if err != nil {
klog.V(5).Infof("Error reading %q from cycleState: %v", postFilterStateKey, err)
return nil, nil
}
s, ok := c.(*postFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to ResourceLimits.postFilterState error", c)
}
return s.podResourceRequest, nil
}
// Score invoked at the Score extension point.
// The "score" returned in this function is the matching number of pods on the `nodeName`.
// Currently works as follows:
// If a node does not publish its allocatable resources (cpu and memory both), the node score is not affected.
// If a pod does not specify its cpu and memory limits both, the node score is not affected.
// If one or both of cpu and memory limits of the pod are satisfied, the node is assigned a score of 1.
// Rationale of choosing the lowest score of 1 is that this is mainly selected to break ties between nodes that have
// same scores assigned by one of least and most requested priority functions.
func (rl *ResourceLimits) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := rl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil || nodeInfo.Node() == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
}
allocatableResources := nodeInfo.AllocatableResource()
podLimits, err := getPodResource(state)
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
memScore := computeScore(podLimits.Memory, allocatableResources.Memory)
score := int64(0)
if cpuScore == 1 || memScore == 1 {
score = 1
}
return score, nil
}
// ScoreExtensions of the Score plugin.
func (rl *ResourceLimits) ScoreExtensions() framework.ScoreExtensions {
return nil
}
// NewResourceLimits initializes a new plugin and returns it.
func NewResourceLimits(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &ResourceLimits{handle: h}, nil
}
// getResourceLimits computes resource limits for input pod.
// The reason to create this new function is to be consistent with other
// priority functions because most or perhaps all priority functions work
// with schedulernodeinfo.Resource.
func getResourceLimits(pod *v1.Pod) *schedulernodeinfo.Resource {
result := &schedulernodeinfo.Resource{}
for _, container := range pod.Spec.Containers {
result.Add(container.Resources.Limits)
}
// take max_resource(sum_pod, any_init_container)
for _, container := range pod.Spec.InitContainers {
result.SetMaxResource(container.Resources.Limits)
}
return result
}
// computeScore returns 1 if limit value is less than or equal to allocatable
// value, otherwise it returns 0.
func computeScore(limit, allocatable int64) int64 {
if limit != 0 && allocatable != 0 && limit <= allocatable {
return 1
}
return 0
}

View File

@ -14,9 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
package noderesources
import (
"context"
"reflect"
"testing"
@ -26,7 +27,7 @@ import (
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
)
func TestResourceLimitsPriority(t *testing.T) {
func TestResourceLimits(t *testing.T) {
noResources := v1.PodSpec{
Containers: []v1.Container{},
}
@ -139,23 +140,20 @@ func TestResourceLimitsPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(nil, test.nodes))
metadata := &priorityMetadata{
podLimits: getResourceLimits(test.pod),
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
p := &ResourceLimits{handle: fh}
state := framework.NewCycleState()
status := p.PostFilter(context.Background(), state, test.pod, test.nodes, nil)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
for _, hasMeta := range []bool{true, false} {
meta := metadata
if !hasMeta {
meta = nil
}
list, err := runMapReducePriority(ResourceLimitsPriorityMap, nil, meta, test.pod, snapshot, test.nodes)
for i := range test.nodes {
hostResult, err := p.Score(context.Background(), state, test.pod, test.nodes[i].Name)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("hasMeta %#v expected %#v, got %#v", hasMeta, test.expectedList, list)
if !reflect.DeepEqual(test.expectedList[i].Score, hostResult) {
t.Errorf("expected %#v, got %#v", test.expectedList[i].Score, hostResult)
}
}
})

View File

@ -70,6 +70,7 @@ func NewInTreeRegistry(args *RegistryArgs) framework.Registry {
noderesources.MostAllocatedName: noderesources.NewMostAllocated,
noderesources.LeastAllocatedName: noderesources.NewLeastAllocated,
noderesources.RequestedToCapacityRatioName: noderesources.NewRequestedToCapacityRatio,
noderesources.ResourceLimitsName: noderesources.NewResourceLimits,
volumebinding.Name: func(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return volumebinding.NewFromVolumeBinder(args.VolumeBinder), nil
},
@ -305,6 +306,12 @@ func NewConfigProducerRegistry() *ConfigProducerRegistry {
pluginConfig = append(pluginConfig, makePluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
return
})
registry.RegisterPriority(priorities.ResourceLimitsPriority,
func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
plugins.PostFilter = appendToPluginSet(plugins.PostFilter, noderesources.ResourceLimitsName, nil)
plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight)
return
})
return registry
}