Merge pull request #86548 from draveness/feature/image-locality-score-plugin

feat: implement image locality as score plugin
This commit is contained in:
Kubernetes Prow Robot 2019-12-23 08:29:33 -08:00 committed by GitHub
commit 23864bc09c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 99 additions and 377 deletions

View File

@ -11,7 +11,6 @@ go_library(
srcs = [
"balanced_resource_allocation.go",
"even_pods_spread.go",
"image_locality.go",
"least_requested.go",
"metadata.go",
"most_requested.go",
@ -37,7 +36,6 @@ go_library(
"//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -55,7 +53,6 @@ go_test(
srcs = [
"balanced_resource_allocation_test.go",
"even_pods_spread_test.go",
"image_locality_test.go",
"least_requested_test.go",
"metadata_test.go",
"most_requested_test.go",
@ -78,7 +75,6 @@ go_test(
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -1,109 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"fmt"
"strings"
v1 "k8s.io/api/core/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/parsers"
)
// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for
// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range.
const (
mb int64 = 1024 * 1024
minThreshold int64 = 23 * mb
maxThreshold int64 = 1000 * mb
)
// ImageLocalityPriorityMap is a priority function that favors nodes that already have requested pod container's images.
// It will detect whether the requested images are present on a node, and then calculate a score ranging from 0 to 10
// based on the total size of those images.
// - If none of the images are present, this node will be given the lowest priority.
// - If some of the images are present on a node, the larger their sizes' sum, the higher the node's priority.
func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
var score int
if priorityMeta, ok := meta.(*priorityMetadata); ok {
score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
} else {
// if we are not able to parse priority meta data, skip this priority
score = 0
}
return framework.NodeScore{
Name: node.Name,
Score: int64(score),
}, nil
}
// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
func calculatePriority(sumScores int64) int {
if sumScores < minThreshold {
sumScores = minThreshold
} else if sumScores > maxThreshold {
sumScores = maxThreshold
}
return int(int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold))
}
// sumImageScores returns the sum of image scores of all the containers that are already on the node.
// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate
// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers.
func sumImageScores(nodeInfo *schedulernodeinfo.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
var sum int64
imageStates := nodeInfo.ImageStates()
for _, container := range containers {
if state, ok := imageStates[normalizedImageName(container.Image)]; ok {
sum += scaledImageScore(state, totalNumNodes)
}
}
return sum
}
// scaledImageScore returns an adaptively scaled score for the given state of an image.
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
// a few nodes due to image locality.
func scaledImageScore(imageState *schedulernodeinfo.ImageStateSummary, totalNumNodes int) int64 {
spread := float64(imageState.NumNodes) / float64(totalNumNodes)
return int64(float64(imageState.Size) * spread)
}
// normalizedImageName returns the CRI compliant name for a given image.
// TODO: cover the corner cases of missed matches, e.g,
// 1. Using Docker as runtime and docker.io/library/test:tag in pod spec, but only test:tag will present in node status
// 2. Using the implicit registry, i.e., test:tag or library/test:tag in pod spec but only docker.io/library/test:tag
// in node status; note that if users consistently use one registry format, this should not happen.
func normalizedImageName(name string) string {
if strings.LastIndex(name, ":") <= strings.LastIndex(name, "/") {
name = name + ":" + parsers.DefaultImageTag
}
return name
}

View File

@ -1,233 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"crypto/sha256"
"encoding/hex"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/util/parsers"
)
func TestImageLocalityPriority(t *testing.T) {
test40250 := v1.PodSpec{
Containers: []v1.Container{
{
Image: "gcr.io/40",
},
{
Image: "gcr.io/250",
},
},
}
test40300 := v1.PodSpec{
Containers: []v1.Container{
{
Image: "gcr.io/40",
},
{
Image: "gcr.io/300",
},
},
}
testMinMax := v1.PodSpec{
Containers: []v1.Container{
{
Image: "gcr.io/10",
},
{
Image: "gcr.io/2000",
},
},
}
node403002000 := v1.NodeStatus{
Images: []v1.ContainerImage{
{
Names: []string{
"gcr.io/40:" + parsers.DefaultImageTag,
"gcr.io/40:v1",
"gcr.io/40:v1",
},
SizeBytes: int64(40 * mb),
},
{
Names: []string{
"gcr.io/300:" + parsers.DefaultImageTag,
"gcr.io/300:v1",
},
SizeBytes: int64(300 * mb),
},
{
Names: []string{
"gcr.io/2000:" + parsers.DefaultImageTag,
},
SizeBytes: int64(2000 * mb),
},
},
}
node25010 := v1.NodeStatus{
Images: []v1.ContainerImage{
{
Names: []string{
"gcr.io/250:" + parsers.DefaultImageTag,
},
SizeBytes: int64(250 * mb),
},
{
Names: []string{
"gcr.io/10:" + parsers.DefaultImageTag,
"gcr.io/10:v1",
},
SizeBytes: int64(10 * mb),
},
},
}
nodeWithNoImages := v1.NodeStatus{}
tests := []struct {
pod *v1.Pod
pods []*v1.Pod
nodes []*v1.Node
expectedList framework.NodeScoreList
name string
}{
{
// Pod: gcr.io/40 gcr.io/250
// Node1
// Image: gcr.io/40:latest 40MB
// Score: 0 (40M/2 < 23M, min-threshold)
// Node2
// Image: gcr.io/250:latest 250MB
// Score: 100 * (250M/2 - 23M)/(1000M - 23M) = 100
pod: &v1.Pod{Spec: test40250},
nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 0}, {Name: "machine2", Score: 10}},
name: "two images spread on two nodes, prefer the larger image one",
},
{
// Pod: gcr.io/40 gcr.io/300
// Node1
// Image: gcr.io/40:latest 40MB, gcr.io/300:latest 300MB
// Score: 100 * ((40M + 300M)/2 - 23M)/(1000M - 23M) = 15
// Node2
// Image: not present
// Score: 0
pod: &v1.Pod{Spec: test40300},
nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 15}, {Name: "machine2", Score: 0}},
name: "two images on one node, prefer this node",
},
{
// Pod: gcr.io/2000 gcr.io/10
// Node1
// Image: gcr.io/2000:latest 2000MB
// Score: 100 (2000M/2 >= 1000M, max-threshold)
// Node2
// Image: gcr.io/10:latest 10MB
// Score: 0 (10M/2 < 23M, min-threshold)
pod: &v1.Pod{Spec: testMinMax},
nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)},
expectedList: []framework.NodeScore{{Name: "machine1", Score: framework.MaxNodeScore}, {Name: "machine2", Score: 0}},
name: "if exceed limit, use limit",
},
{
// Pod: gcr.io/2000 gcr.io/10
// Node1
// Image: gcr.io/2000:latest 2000MB
// Score: 100 * (2000M/3 - 23M)/(1000M - 23M) = 65
// Node2
// Image: gcr.io/10:latest 10MB
// Score: 0 (10M/2 < 23M, min-threshold)
// Node3
// Image:
// Score: 0
pod: &v1.Pod{Spec: testMinMax},
nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010), makeImageNode("machine3", nodeWithNoImages)},
expectedList: []framework.NodeScore{{Name: "machine1", Score: 65}, {Name: "machine2", Score: 0}, {Name: "machine3", Score: 0}},
name: "if exceed limit, use limit (with node which has no images present)",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
list, err := runMapReducePriority(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)}, test.pod, snapshot, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
sortNodeScoreList(test.expectedList)
sortNodeScoreList(list)
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("expected %#v, got %#v", test.expectedList, list)
}
})
}
}
func TestNormalizedImageName(t *testing.T) {
for _, testCase := range []struct {
Name string
Input string
Output string
}{
{Name: "add :latest postfix 1", Input: "root", Output: "root:latest"},
{Name: "add :latest postfix 2", Input: "gcr.io:5000/root", Output: "gcr.io:5000/root:latest"},
{Name: "keep it as is 1", Input: "root:tag", Output: "root:tag"},
{Name: "keep it as is 2", Input: "root@" + getImageFakeDigest("root"), Output: "root@" + getImageFakeDigest("root")},
} {
t.Run(testCase.Name, func(t *testing.T) {
image := normalizedImageName(testCase.Input)
if image != testCase.Output {
t.Errorf("expected image reference: %q, got %q", testCase.Output, image)
}
})
}
}
func makeImageNode(node string, status v1.NodeStatus) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: node},
Status: status,
}
}
func getImageFakeDigest(fakeContent string) string {
hash := sha256.Sum256([]byte(fakeContent))
return "sha256:" + hex.EncodeToString(hash[:])
}

View File

@ -89,5 +89,5 @@ func init() {
scheduler.RegisterPriorityMapReduceFunction(priorities.TaintTolerationPriority, priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1)
// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
scheduler.RegisterPriorityMapReduceFunction(priorities.ImageLocalityPriority, priorities.ImageLocalityPriorityMap, nil, 1)
scheduler.RegisterPriorityMapReduceFunction(priorities.ImageLocalityPriority, nil, nil, 1)
}

View File

@ -6,9 +6,9 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
],
@ -19,15 +19,11 @@ go_test(
srcs = ["image_locality_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm/priorities:go_default_library",
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo/snapshot:go_default_library",
"//pkg/util/parsers:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)

View File

@ -19,12 +19,21 @@ package imagelocality
import (
"context"
"fmt"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/parsers"
)
// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for
// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range.
const (
mb int64 = 1024 * 1024
minThreshold int64 = 23 * mb
maxThreshold int64 = 1000 * mb
)
// ImageLocality is a score plugin that favors nodes that already have requested pod container's images.
@ -49,9 +58,15 @@ func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState,
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
}
meta := migration.PriorityMetadata(state)
s, err := priorities.ImageLocalityPriorityMap(pod, meta, nodeInfo)
return s.Score, migration.ErrorToFrameworkStatus(err)
nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
totalNumNodes := len(nodeInfos)
score := calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, totalNumNodes))
return score, nil
}
// ScoreExtensions of the Score plugin.
@ -63,3 +78,52 @@ func (pl *ImageLocality) ScoreExtensions() framework.ScoreExtensions {
func New(_ *runtime.Unknown, h framework.FrameworkHandle) (framework.Plugin, error) {
return &ImageLocality{handle: h}, nil
}
// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
func calculatePriority(sumScores int64) int64 {
if sumScores < minThreshold {
sumScores = minThreshold
} else if sumScores > maxThreshold {
sumScores = maxThreshold
}
return int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold)
}
// sumImageScores returns the sum of image scores of all the containers that are already on the node.
// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate
// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers.
func sumImageScores(nodeInfo *schedulernodeinfo.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
var sum int64
imageStates := nodeInfo.ImageStates()
for _, container := range containers {
if state, ok := imageStates[normalizedImageName(container.Image)]; ok {
sum += scaledImageScore(state, totalNumNodes)
}
}
return sum
}
// scaledImageScore returns an adaptively scaled score for the given state of an image.
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
// a few nodes due to image locality.
func scaledImageScore(imageState *schedulernodeinfo.ImageStateSummary, totalNumNodes int) int64 {
spread := float64(imageState.NumNodes) / float64(totalNumNodes)
return int64(float64(imageState.Size) * spread)
}
// normalizedImageName returns the CRI compliant name for a given image.
// TODO: cover the corner cases of missed matches, e.g,
// 1. Using Docker as runtime and docker.io/library/test:tag in pod spec, but only test:tag will present in node status
// 2. Using the implicit registry, i.e., test:tag or library/test:tag in pod spec but only docker.io/library/test:tag
// in node status; note that if users consistently use one registry format, this should not happen.
func normalizedImageName(name string) string {
if strings.LastIndex(name, ":") <= strings.LastIndex(name, "/") {
name = name + ":" + parsers.DefaultImageTag
}
return name
}

View File

@ -18,22 +18,18 @@ package imagelocality
import (
"context"
"crypto/sha256"
"encoding/hex"
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
nodeinfosnapshot "k8s.io/kubernetes/pkg/scheduler/nodeinfo/snapshot"
"k8s.io/kubernetes/pkg/util/parsers"
)
var mb int64 = 1024 * 1024
func TestImageLocalityPriority(t *testing.T) {
test40250 := v1.PodSpec{
Containers: []v1.Container{
@ -190,22 +186,9 @@ func TestImageLocalityPriority(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
metaDataProducer := priorities.NewMetadataFactory(
informerFactory.Core().V1().Services().Lister(),
informerFactory.Core().V1().ReplicationControllers().Lister(),
informerFactory.Apps().V1().ReplicaSets().Lister(),
informerFactory.Apps().V1().StatefulSets().Lister(),
1,
)
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(nil, test.nodes))
meta := metaDataProducer(test.pod, test.nodes, snapshot)
state := framework.NewCycleState()
state.Write(migration.PrioritiesStateKey, &migration.PrioritiesStateData{Reference: meta})
fh, _ := framework.NewFramework(nil, nil, nil, framework.WithSnapshotSharedLister(snapshot))
@ -227,9 +210,34 @@ func TestImageLocalityPriority(t *testing.T) {
}
}
func TestNormalizedImageName(t *testing.T) {
for _, testCase := range []struct {
Name string
Input string
Output string
}{
{Name: "add :latest postfix 1", Input: "root", Output: "root:latest"},
{Name: "add :latest postfix 2", Input: "gcr.io:5000/root", Output: "gcr.io:5000/root:latest"},
{Name: "keep it as is 1", Input: "root:tag", Output: "root:tag"},
{Name: "keep it as is 2", Input: "root@" + getImageFakeDigest("root"), Output: "root@" + getImageFakeDigest("root")},
} {
t.Run(testCase.Name, func(t *testing.T) {
image := normalizedImageName(testCase.Input)
if image != testCase.Output {
t.Errorf("expected image reference: %q, got %q", testCase.Output, image)
}
})
}
}
func makeImageNode(node string, status v1.NodeStatus) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: node},
Status: status,
}
}
func getImageFakeDigest(fakeContent string) string {
hash := sha256.Sum256([]byte(fakeContent))
return "sha256:" + hex.EncodeToString(hash[:])
}