diff --git a/pkg/scheduler/algorithm/priorities/image_locality.go b/pkg/scheduler/algorithm/priorities/image_locality.go index 00820f2b4e6..041e52d4fce 100644 --- a/pkg/scheduler/algorithm/priorities/image_locality.go +++ b/pkg/scheduler/algorithm/priorities/image_locality.go @@ -26,11 +26,12 @@ import ( "k8s.io/kubernetes/pkg/util/parsers" ) -// This is a reasonable size range of all container images. 90%ile of images on dockerhub drops into this range. +// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for +// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range. const ( - mb int64 = 1024 * 1024 - minImgSize int64 = 23 * mb - maxImgSize int64 = 1000 * mb + mb int64 = 1024 * 1024 + minThreshold int64 = 23 * mb + maxThreshold int64 = 1000 * mb ) // ImageLocalityPriorityMap is a priority function that favors nodes that already have requested pod container's images. @@ -44,44 +45,55 @@ func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *scheduler return schedulerapi.HostPriority{}, fmt.Errorf("node not found") } - sumSize := totalImageSize(nodeInfo, pod.Spec.Containers) + var score int + if priorityMeta, ok := meta.(*priorityMetadata); ok { + score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes)) + } else { + // if we are not able to parse priority meta data, skip this priority + score = 0 + } return schedulerapi.HostPriority{ Host: node.Name, - Score: calculateScoreFromSize(sumSize), + Score: score, }, nil } -// calculateScoreFromSize calculates the priority of a node. sumSize is sum size of requested images on this node. -// 1. Split image size range into 10 buckets. -// 2. Decide the priority of a given sumSize based on which bucket it belongs to. -func calculateScoreFromSize(sumSize int64) int { - switch { - case sumSize == 0 || sumSize < minImgSize: - // 0 means none of the images required by this pod are present on this - // node or the total size of the images present are too small to be taken into further consideration. - return 0 - - case sumSize >= maxImgSize: - // If existing images' total size is larger than max, just make it highest priority. - return schedulerapi.MaxPriority +// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's +// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores. +func calculatePriority(sumScores int64) int { + if sumScores < minThreshold { + sumScores = minThreshold + } else if sumScores > maxThreshold { + sumScores = maxThreshold } - return int((int64(schedulerapi.MaxPriority) * (sumSize - minImgSize) / (maxImgSize - minImgSize)) + 1) + return int(int64(schedulerapi.MaxPriority) * (sumScores - minThreshold) / (maxThreshold - minThreshold)) } -// totalImageSize returns the total image size of all the containers that are already on the node. -func totalImageSize(nodeInfo *schedulercache.NodeInfo, containers []v1.Container) int64 { - var total int64 +// sumImageScores returns the sum of image scores of all the containers that are already on the node. +// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate +// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers. +func sumImageScores(nodeInfo *schedulercache.NodeInfo, containers []v1.Container, totalNumNodes int) int64 { + var sum int64 + imageStates := nodeInfo.ImageStates() - imageSizes := nodeInfo.ImageSizes() for _, container := range containers { - if size, ok := imageSizes[normalizedImageName(container.Image)]; ok { - total += size + if state, ok := imageStates[normalizedImageName(container.Image)]; ok { + sum += scaledImageScore(state, totalNumNodes) } } - return total + return sum +} + +// scaledImageScore returns an adaptively scaled score for the given state of an image. +// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to. +// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or +// a few nodes due to image locality. +func scaledImageScore(imageState *schedulercache.ImageStateSummary, totalNumNodes int) int64 { + spread := float64(imageState.NumNodes) / float64(totalNumNodes) + return int64(float64(imageState.Size) * spread) } // normalizedImageName returns the CRI compliant name for a given image. diff --git a/pkg/scheduler/algorithm/priorities/image_locality_test.go b/pkg/scheduler/algorithm/priorities/image_locality_test.go index a8f76a03d32..58dc02a2335 100644 --- a/pkg/scheduler/algorithm/priorities/image_locality_test.go +++ b/pkg/scheduler/algorithm/priorities/image_locality_test.go @@ -42,13 +42,13 @@ func TestImageLocalityPriority(t *testing.T) { }, } - test40140 := v1.PodSpec{ + test40300 := v1.PodSpec{ Containers: []v1.Container{ { Image: "gcr.io/40", }, { - Image: "gcr.io/140", + Image: "gcr.io/300", }, }, } @@ -64,7 +64,7 @@ func TestImageLocalityPriority(t *testing.T) { }, } - node401402000 := v1.NodeStatus{ + node403002000 := v1.NodeStatus{ Images: []v1.ContainerImage{ { Names: []string{ @@ -76,10 +76,10 @@ func TestImageLocalityPriority(t *testing.T) { }, { Names: []string{ - "gcr.io/140:" + parsers.DefaultImageTag, - "gcr.io/140:v1", + "gcr.io/300:" + parsers.DefaultImageTag, + "gcr.io/300:v1", }, - SizeBytes: int64(140 * mb), + SizeBytes: int64(300 * mb), }, { Names: []string{ @@ -120,29 +120,29 @@ func TestImageLocalityPriority(t *testing.T) { // Node1 // Image: gcr.io/40:latest 40MB - // Score: (40M-23M)/97.7M + 1 = 1 + // Score: 0 (40M/2 < 23M, min-threshold) // Node2 // Image: gcr.io/250:latest 250MB - // Score: (250M-23M)/97.7M + 1 = 3 + // Score: 10 * (250M/2 - 23M)/(1000M - 23M) = 1 pod: &v1.Pod{Spec: test40250}, - nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)}, - expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 3}}, + nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 1}}, name: "two images spread on two nodes, prefer the larger image one", }, { - // Pod: gcr.io/40 gcr.io/140 + // Pod: gcr.io/40 gcr.io/300 // Node1 - // Image: gcr.io/40:latest 40MB, gcr.io/140:latest 140MB - // Score: (40M+140M-23M)/97.7M + 1 = 2 + // Image: gcr.io/40:latest 40MB, gcr.io/300:latest 300MB + // Score: 10 * ((40M + 300M)/2 - 23M)/(1000M - 23M) = 1 // Node2 // Image: not present // Score: 0 - pod: &v1.Pod{Spec: test40140}, - nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)}, - expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 2}, {Host: "machine2", Score: 0}}, + pod: &v1.Pod{Spec: test40300}, + nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 0}}, name: "two images on one node, prefer this node", }, { @@ -150,13 +150,13 @@ func TestImageLocalityPriority(t *testing.T) { // Node1 // Image: gcr.io/2000:latest 2000MB - // Score: 2000 > max score = 10 + // Score: 10 (2000M/2 >= 1000M, max-threshold) // Node2 // Image: gcr.io/10:latest 10MB - // Score: 10 < min score = 0 + // Score: 0 (10M/2 < 23M, min-threshold) pod: &v1.Pod{Spec: testMinMax}, - nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)}, + nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: schedulerapi.MaxPriority}, {Host: "machine2", Score: 0}}, name: "if exceed limit, use limit", }, @@ -165,7 +165,7 @@ func TestImageLocalityPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(ImageLocalityPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)})(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/algorithm/priorities/metadata.go b/pkg/scheduler/algorithm/priorities/metadata.go index 3cb0f2f442a..fe824b91c38 100644 --- a/pkg/scheduler/algorithm/priorities/metadata.go +++ b/pkg/scheduler/algorithm/priorities/metadata.go @@ -52,6 +52,7 @@ type priorityMetadata struct { podSelectors []labels.Selector controllerRef *metav1.OwnerReference podFirstServiceSelector labels.Selector + totalNumNodes int } // PriorityMetadata is a PriorityMetadataProducer. Node info can be nil. @@ -67,6 +68,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister), controllerRef: priorityutil.GetControllerRef(pod), podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister), + totalNumNodes: len(nodeNameToInfo), } } diff --git a/pkg/scheduler/cache/BUILD b/pkg/scheduler/cache/BUILD index 249437c56a1..69efd93eb6f 100644 --- a/pkg/scheduler/cache/BUILD +++ b/pkg/scheduler/cache/BUILD @@ -19,6 +19,7 @@ go_library( "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/github.com/golang/glog:go_default_library", @@ -30,13 +31,13 @@ go_test( srcs = [ "cache_test.go", "node_info_test.go", + "util_test.go", ], embed = [":go_default_library"], deps = [ "//pkg/features:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/util:go_default_library", - "//pkg/util/parsers:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", @@ -44,6 +45,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", ], diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index be4a808c1f9..a7fd4bdfd0a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -23,6 +23,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" @@ -59,6 +60,8 @@ type schedulerCache struct { podStates map[string]*podState nodes map[string]*NodeInfo pdbs map[string]*policy.PodDisruptionBudget + // A map from image name to its imageState. + imageStates map[string]*imageState } type podState struct { @@ -69,6 +72,29 @@ type podState struct { bindingFinished bool } +type imageState struct { + // Size of the image + size int64 + // A set of node names for nodes having this image present + nodes sets.String +} + +// ImageStateSummary provides summarized information about the state of an image. +type ImageStateSummary struct { + // Size of the image + Size int64 + // Used to track how many nodes have this image + NumNodes int +} + +// createImageStateSummary returns a summarizing snapshot of the given image's state. +func (cache *schedulerCache) createImageStateSummary(state *imageState) *ImageStateSummary { + return &ImageStateSummary{ + Size: state.size, + NumNodes: len(state.nodes), + } +} + func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { return &schedulerCache{ ttl: ttl, @@ -79,6 +105,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul assumedPods: make(map[string]bool), podStates: make(map[string]*podState), pdbs: make(map[string]*policy.PodDisruptionBudget), + imageStates: make(map[string]*imageState), } } @@ -113,6 +140,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot { func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { cache.mu.Lock() defer cache.mu.Unlock() + for name, info := range cache.nodes { if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { // Transient scheduler info is reset here. @@ -394,7 +422,11 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error { if !ok { n = NewNodeInfo() cache.nodes[node.Name] = n + } else { + cache.removeNodeImageStates(n.node) } + + cache.addNodeImageStates(node, n) return n.SetNode(node) } @@ -406,7 +438,11 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { if !ok { n = NewNodeInfo() cache.nodes[newNode.Name] = n + } else { + cache.removeNodeImageStates(n.node) } + + cache.addNodeImageStates(newNode, n) return n.SetNode(newNode) } @@ -425,9 +461,62 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { if len(n.pods) == 0 && n.node == nil { delete(cache.nodes, node.Name) } + + cache.removeNodeImageStates(node) return nil } +// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in +// scheduler cache. This function assumes the lock to scheduler cache has been acquired. +func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInfo) { + newSum := make(map[string]*ImageStateSummary) + + for _, image := range node.Status.Images { + for _, name := range image.Names { + // update the entry in imageStates + state, ok := cache.imageStates[name] + if !ok { + state = &imageState{ + size: image.SizeBytes, + nodes: sets.NewString(node.Name), + } + cache.imageStates[name] = state + } else { + state.nodes.Insert(node.Name) + } + // create the imageStateSummary for this image + if _, ok := newSum[name]; !ok { + newSum[name] = cache.createImageStateSummary(state) + } + } + } + nodeInfo.imageStates = newSum +} + +// removeNodeImageStates removes the given node record from image entries having the node +// in imageStates cache. After the removal, if any image becomes free, i.e., the image +// is no longer available on any node, the image entry will be removed from imageStates. +func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { + if node == nil { + return + } + + for _, image := range node.Status.Images { + for _, name := range image.Names { + state, ok := cache.imageStates[name] + if ok { + state.nodes.Delete(node.Name) + } + if len(state.nodes) == 0 { + // Remove the unused image to make sure the length of + // imageStates represents the total number of different + // images on all nodes + delete(cache.imageStates, name) + } + } + } +} + func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error { cache.mu.Lock() defer cache.mu.Unlock() diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 065a3fa8062..0da8ee9bf0e 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -108,7 +108,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[1], testPods[2]}, @@ -125,7 +125,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1], testPods[2]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { // test non-zero request pods: []*v1.Pod{testPods[3]}, @@ -142,7 +142,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[3]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[4]}, @@ -160,7 +160,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[4], testPods[5]}, @@ -178,7 +178,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4], testPods[5]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[6]}, @@ -195,7 +195,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[6]}, usedPorts: newHostPortInfoBuilder().build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, } @@ -275,7 +275,7 @@ func TestExpirePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} @@ -328,7 +328,7 @@ func TestAddPodWillConfirm(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} @@ -425,7 +425,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{updatedPod.DeepCopy()}, usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, }} @@ -481,7 +481,7 @@ func TestAddPodAfterExpiration(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} @@ -537,7 +537,7 @@ func TestUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, { requestedResource: &Resource{ MilliCPU: 100, @@ -551,7 +551,7 @@ func TestUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }}, }} @@ -669,7 +669,7 @@ func TestExpireAddUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, { requestedResource: &Resource{ MilliCPU: 100, @@ -683,7 +683,7 @@ func TestExpireAddUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }}, }} @@ -761,7 +761,7 @@ func TestEphemeralStorageResource(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{podE}, usedPorts: schedutil.HostPortInfo{}, - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, } @@ -808,7 +808,7 @@ func TestRemovePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} diff --git a/pkg/scheduler/cache/node_info.go b/pkg/scheduler/cache/node_info.go index 1479e81c554..31be774578e 100644 --- a/pkg/scheduler/cache/node_info.go +++ b/pkg/scheduler/cache/node_info.go @@ -58,9 +58,10 @@ type NodeInfo struct { taints []v1.Taint taintsErr error - // This is a map from image name to image size, also for checking image existence on the node - // Cache it here to avoid rebuilding the map during scheduling, e.g., in image_locality.go - imageSizes map[string]int64 + // imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for + // checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image + // state information. + imageStates map[string]*ImageStateSummary // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of // scheduling cycle. @@ -261,7 +262,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { TransientInfo: newTransientSchedulerInfo(), generation: nextGeneration(), usedPorts: make(util.HostPortInfo), - imageSizes: make(map[string]int64), + imageStates: make(map[string]*ImageStateSummary), } for _, pod := range pods { ni.AddPod(pod) @@ -293,12 +294,12 @@ func (n *NodeInfo) UsedPorts() util.HostPortInfo { return n.usedPorts } -// ImageSizes returns the image size information on this node. -func (n *NodeInfo) ImageSizes() map[string]int64 { +// ImageStates returns the state information of all images. +func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary { if n == nil { return nil } - return n.imageSizes + return n.imageStates } // PodsWithAffinity return all pods with (anti)affinity constraints on this node. @@ -392,7 +393,7 @@ func (n *NodeInfo) Clone() *NodeInfo { diskPressureCondition: n.diskPressureCondition, pidPressureCondition: n.pidPressureCondition, usedPorts: make(util.HostPortInfo), - imageSizes: n.imageSizes, + imageStates: make(map[string]*ImageStateSummary), generation: n.generation, } if len(n.pods) > 0 { @@ -547,17 +548,6 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) { } } -func (n *NodeInfo) updateImageSizes() { - node := n.Node() - imageSizes := make(map[string]int64) - for _, image := range node.Status.Images { - for _, name := range image.Names { - imageSizes[name] = image.SizeBytes - } - } - n.imageSizes = imageSizes -} - // SetNode sets the overall node information. func (n *NodeInfo) SetNode(node *v1.Node) error { n.node = node @@ -579,7 +569,6 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { } } n.TransientInfo = newTransientSchedulerInfo() - n.updateImageSizes() n.generation = nextGeneration() return nil } @@ -596,6 +585,7 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error { n.memoryPressureCondition = v1.ConditionUnknown n.diskPressureCondition = v1.ConditionUnknown n.pidPressureCondition = v1.ConditionUnknown + n.imageStates = make(map[string]*ImageStateSummary) n.generation = nextGeneration() return nil } diff --git a/pkg/scheduler/cache/node_info_test.go b/pkg/scheduler/cache/node_info_test.go index 4c20f57b1ef..cf3eae51d96 100644 --- a/pkg/scheduler/cache/node_info_test.go +++ b/pkg/scheduler/cache/node_info_test.go @@ -26,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/scheduler/util" - "k8s.io/kubernetes/pkg/util/parsers" ) func TestNewResource(t *testing.T) { @@ -241,46 +240,6 @@ func TestSetMaxResource(t *testing.T) { } } -func TestImageSizes(t *testing.T) { - ni := fakeNodeInfo() - ni.node = &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - Status: v1.NodeStatus{ - Images: []v1.ContainerImage{ - { - Names: []string{ - "gcr.io/10:" + parsers.DefaultImageTag, - "gcr.io/10:v1", - }, - SizeBytes: int64(10 * 1024 * 1024), - }, - { - Names: []string{ - "gcr.io/50:" + parsers.DefaultImageTag, - "gcr.io/50:v1", - }, - SizeBytes: int64(50 * 1024 * 1024), - }, - }, - }, - } - - ni.updateImageSizes() - expected := map[string]int64{ - "gcr.io/10:" + parsers.DefaultImageTag: 10 * 1024 * 1024, - "gcr.io/10:v1": 10 * 1024 * 1024, - "gcr.io/50:" + parsers.DefaultImageTag: 50 * 1024 * 1024, - "gcr.io/50:v1": 50 * 1024 * 1024, - } - - imageSizes := ni.ImageSizes() - if !reflect.DeepEqual(expected, imageSizes) { - t.Errorf("expected: %#v, got: %#v", expected, imageSizes) - } -} - func TestNewNodeInfo(t *testing.T) { nodeName := "test-node" pods := []*v1.Pod{ @@ -312,7 +271,7 @@ func TestNewNodeInfo(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -401,9 +360,7 @@ func TestNodeInfoClone(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{ - "gcr.io/10": 10 * 1024 * 1024, - }, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -473,9 +430,7 @@ func TestNodeInfoClone(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{ - "gcr.io/10": 10 * 1024 * 1024, - }, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -633,7 +588,7 @@ func TestNodeInfoAddPod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -752,7 +707,7 @@ func TestNodeInfoRemovePod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -868,7 +823,7 @@ func TestNodeInfoRemovePod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/scheduler/cache/util.go b/pkg/scheduler/cache/util.go index c3037dcc3d3..5a252b6402e 100644 --- a/pkg/scheduler/cache/util.go +++ b/pkg/scheduler/cache/util.go @@ -16,7 +16,10 @@ limitations under the License. package cache -import "k8s.io/api/core/v1" +import ( + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" +) // CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names // and the values are the aggregated information for that node. @@ -29,11 +32,47 @@ func CreateNodeNameToInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*NodeI } nodeNameToInfo[nodeName].AddPod(pod) } + imageExistenceMap := createImageExistenceMap(nodes) + for _, node := range nodes { if _, ok := nodeNameToInfo[node.Name]; !ok { nodeNameToInfo[node.Name] = NewNodeInfo() } - nodeNameToInfo[node.Name].SetNode(node) + nodeInfo := nodeNameToInfo[node.Name] + nodeInfo.SetNode(node) + nodeInfo.imageStates = getNodeImageStates(node, imageExistenceMap) } return nodeNameToInfo } + +// getNodeImageStates returns the given node's image states based on the given imageExistence map. +func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*ImageStateSummary { + imageStates := make(map[string]*ImageStateSummary) + + for _, image := range node.Status.Images { + for _, name := range image.Names { + imageStates[name] = &ImageStateSummary{ + Size: image.SizeBytes, + NumNodes: len(imageExistenceMap[name]), + } + } + } + return imageStates +} + +// createImageExistenceMap returns a map recording on which nodes the images exist, keyed by the images' names. +func createImageExistenceMap(nodes []*v1.Node) map[string]sets.String { + imageExistenceMap := make(map[string]sets.String) + for _, node := range nodes { + for _, image := range node.Status.Images { + for _, name := range image.Names { + if _, ok := imageExistenceMap[name]; !ok { + imageExistenceMap[name] = sets.NewString(node.Name) + } else { + imageExistenceMap[name].Insert(node.Name) + } + } + } + } + return imageExistenceMap +} diff --git a/pkg/scheduler/cache/util_test.go b/pkg/scheduler/cache/util_test.go new file mode 100644 index 00000000000..7b0b5f111c8 --- /dev/null +++ b/pkg/scheduler/cache/util_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" +) + +const mb int64 = 1024 * 1024 + +func TestGetNodeImageStates(t *testing.T) { + tests := []struct { + node *v1.Node + imageExistenceMap map[string]sets.String + expected map[string]*ImageStateSummary + }{ + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-0"}, + Status: v1.NodeStatus{ + Images: []v1.ContainerImage{ + { + Names: []string{ + "gcr.io/10:v1", + }, + SizeBytes: int64(10 * mb), + }, + { + Names: []string{ + "gcr.io/200:v1", + }, + SizeBytes: int64(200 * mb), + }, + }, + }, + }, + imageExistenceMap: map[string]sets.String{ + "gcr.io/10:v1": sets.NewString("node-0", "node-1"), + "gcr.io/200:v1": sets.NewString("node-0"), + }, + expected: map[string]*ImageStateSummary{ + "gcr.io/10:v1": { + Size: int64(10 * mb), + NumNodes: 2, + }, + "gcr.io/200:v1": { + Size: int64(200 * mb), + NumNodes: 1, + }, + }, + }, + } + + for _, test := range tests { + imageStates := getNodeImageStates(test.node, test.imageExistenceMap) + if !reflect.DeepEqual(test.expected, imageStates) { + t.Errorf("expected: %#v, got: %#v", test.expected, imageStates) + } + } +} + +func TestCreateImageExistenceMap(t *testing.T) { + tests := []struct { + nodes []*v1.Node + expected map[string]sets.String + }{ + { + nodes: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-0"}, + Status: v1.NodeStatus{ + Images: []v1.ContainerImage{ + { + Names: []string{ + "gcr.io/10:v1", + }, + SizeBytes: int64(10 * mb), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Status: v1.NodeStatus{ + Images: []v1.ContainerImage{ + { + Names: []string{ + "gcr.io/10:v1", + }, + SizeBytes: int64(10 * mb), + }, + { + Names: []string{ + "gcr.io/200:v1", + }, + SizeBytes: int64(200 * mb), + }, + }, + }, + }, + }, + expected: map[string]sets.String{ + "gcr.io/10:v1": sets.NewString("node-0", "node-1"), + "gcr.io/200:v1": sets.NewString("node-1"), + }, + }, + } + + for _, test := range tests { + imageMap := createImageExistenceMap(test.nodes) + if !reflect.DeepEqual(test.expected, imageMap) { + t.Errorf("expected: %#v, got: %#v", test.expected, imageMap) + } + } +}