From c3f111f74a88b7f96165c8d75d862464862102d5 Mon Sep 17 00:00:00 2001 From: Silvery Fu Date: Wed, 11 Jul 2018 23:58:02 -0700 Subject: [PATCH] Add image states to scheduler cache --- pkg/scheduler/cache/cache.go | 89 +++++++++++++++++++++++++++ pkg/scheduler/cache/cache_test.go | 32 +++++----- pkg/scheduler/cache/node_info.go | 30 +++------ pkg/scheduler/cache/node_info_test.go | 57 ++--------------- 4 files changed, 121 insertions(+), 87 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index be4a808c1f9..a7fd4bdfd0a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -23,6 +23,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" @@ -59,6 +60,8 @@ type schedulerCache struct { podStates map[string]*podState nodes map[string]*NodeInfo pdbs map[string]*policy.PodDisruptionBudget + // A map from image name to its imageState. + imageStates map[string]*imageState } type podState struct { @@ -69,6 +72,29 @@ type podState struct { bindingFinished bool } +type imageState struct { + // Size of the image + size int64 + // A set of node names for nodes having this image present + nodes sets.String +} + +// ImageStateSummary provides summarized information about the state of an image. +type ImageStateSummary struct { + // Size of the image + Size int64 + // Used to track how many nodes have this image + NumNodes int +} + +// createImageStateSummary returns a summarizing snapshot of the given image's state. +func (cache *schedulerCache) createImageStateSummary(state *imageState) *ImageStateSummary { + return &ImageStateSummary{ + Size: state.size, + NumNodes: len(state.nodes), + } +} + func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { return &schedulerCache{ ttl: ttl, @@ -79,6 +105,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul assumedPods: make(map[string]bool), podStates: make(map[string]*podState), pdbs: make(map[string]*policy.PodDisruptionBudget), + imageStates: make(map[string]*imageState), } } @@ -113,6 +140,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot { func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error { cache.mu.Lock() defer cache.mu.Unlock() + for name, info := range cache.nodes { if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { // Transient scheduler info is reset here. @@ -394,7 +422,11 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error { if !ok { n = NewNodeInfo() cache.nodes[node.Name] = n + } else { + cache.removeNodeImageStates(n.node) } + + cache.addNodeImageStates(node, n) return n.SetNode(node) } @@ -406,7 +438,11 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { if !ok { n = NewNodeInfo() cache.nodes[newNode.Name] = n + } else { + cache.removeNodeImageStates(n.node) } + + cache.addNodeImageStates(newNode, n) return n.SetNode(newNode) } @@ -425,9 +461,62 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error { if len(n.pods) == 0 && n.node == nil { delete(cache.nodes, node.Name) } + + cache.removeNodeImageStates(node) return nil } +// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in +// scheduler cache. This function assumes the lock to scheduler cache has been acquired. +func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInfo) { + newSum := make(map[string]*ImageStateSummary) + + for _, image := range node.Status.Images { + for _, name := range image.Names { + // update the entry in imageStates + state, ok := cache.imageStates[name] + if !ok { + state = &imageState{ + size: image.SizeBytes, + nodes: sets.NewString(node.Name), + } + cache.imageStates[name] = state + } else { + state.nodes.Insert(node.Name) + } + // create the imageStateSummary for this image + if _, ok := newSum[name]; !ok { + newSum[name] = cache.createImageStateSummary(state) + } + } + } + nodeInfo.imageStates = newSum +} + +// removeNodeImageStates removes the given node record from image entries having the node +// in imageStates cache. After the removal, if any image becomes free, i.e., the image +// is no longer available on any node, the image entry will be removed from imageStates. +func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { + if node == nil { + return + } + + for _, image := range node.Status.Images { + for _, name := range image.Names { + state, ok := cache.imageStates[name] + if ok { + state.nodes.Delete(node.Name) + } + if len(state.nodes) == 0 { + // Remove the unused image to make sure the length of + // imageStates represents the total number of different + // images on all nodes + delete(cache.imageStates, name) + } + } + } +} + func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error { cache.mu.Lock() defer cache.mu.Unlock() diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 658acf91064..d9d49ca2740 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -108,7 +108,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[1], testPods[2]}, @@ -125,7 +125,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1], testPods[2]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { // test non-zero request pods: []*v1.Pod{testPods[3]}, @@ -142,7 +142,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[3]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[4]}, @@ -160,7 +160,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[4], testPods[5]}, @@ -178,7 +178,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[4], testPods[5]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, { pods: []*v1.Pod{testPods[6]}, @@ -195,7 +195,7 @@ func TestAssumePodScheduled(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[6]}, usedPorts: newHostPortInfoBuilder().build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, } @@ -275,7 +275,7 @@ func TestExpirePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} @@ -328,7 +328,7 @@ func TestAddPodWillConfirm(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} @@ -425,7 +425,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{updatedPod.DeepCopy()}, usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, }} @@ -481,7 +481,7 @@ func TestAddPodAfterExpiration(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} @@ -537,7 +537,7 @@ func TestUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, { requestedResource: &Resource{ MilliCPU: 100, @@ -551,7 +551,7 @@ func TestUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }}, }} @@ -669,7 +669,7 @@ func TestExpireAddUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[1]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, { requestedResource: &Resource{ MilliCPU: 100, @@ -683,7 +683,7 @@ func TestExpireAddUpdatePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{testPods[0]}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }}, }} @@ -761,7 +761,7 @@ func TestEphemeralStorageResource(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{podE}, usedPorts: schedutil.HostPortInfo{}, - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }, } @@ -808,7 +808,7 @@ func TestRemovePod(t *testing.T) { allocatableResource: &Resource{}, pods: []*v1.Pod{basePod}, usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), - imageSizes: map[string]int64{}, + imageStates: make(map[string]*ImageStateSummary), }, }} diff --git a/pkg/scheduler/cache/node_info.go b/pkg/scheduler/cache/node_info.go index 1479e81c554..31be774578e 100644 --- a/pkg/scheduler/cache/node_info.go +++ b/pkg/scheduler/cache/node_info.go @@ -58,9 +58,10 @@ type NodeInfo struct { taints []v1.Taint taintsErr error - // This is a map from image name to image size, also for checking image existence on the node - // Cache it here to avoid rebuilding the map during scheduling, e.g., in image_locality.go - imageSizes map[string]int64 + // imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for + // checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image + // state information. + imageStates map[string]*ImageStateSummary // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of // scheduling cycle. @@ -261,7 +262,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { TransientInfo: newTransientSchedulerInfo(), generation: nextGeneration(), usedPorts: make(util.HostPortInfo), - imageSizes: make(map[string]int64), + imageStates: make(map[string]*ImageStateSummary), } for _, pod := range pods { ni.AddPod(pod) @@ -293,12 +294,12 @@ func (n *NodeInfo) UsedPorts() util.HostPortInfo { return n.usedPorts } -// ImageSizes returns the image size information on this node. -func (n *NodeInfo) ImageSizes() map[string]int64 { +// ImageStates returns the state information of all images. +func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary { if n == nil { return nil } - return n.imageSizes + return n.imageStates } // PodsWithAffinity return all pods with (anti)affinity constraints on this node. @@ -392,7 +393,7 @@ func (n *NodeInfo) Clone() *NodeInfo { diskPressureCondition: n.diskPressureCondition, pidPressureCondition: n.pidPressureCondition, usedPorts: make(util.HostPortInfo), - imageSizes: n.imageSizes, + imageStates: make(map[string]*ImageStateSummary), generation: n.generation, } if len(n.pods) > 0 { @@ -547,17 +548,6 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) { } } -func (n *NodeInfo) updateImageSizes() { - node := n.Node() - imageSizes := make(map[string]int64) - for _, image := range node.Status.Images { - for _, name := range image.Names { - imageSizes[name] = image.SizeBytes - } - } - n.imageSizes = imageSizes -} - // SetNode sets the overall node information. func (n *NodeInfo) SetNode(node *v1.Node) error { n.node = node @@ -579,7 +569,6 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { } } n.TransientInfo = newTransientSchedulerInfo() - n.updateImageSizes() n.generation = nextGeneration() return nil } @@ -596,6 +585,7 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error { n.memoryPressureCondition = v1.ConditionUnknown n.diskPressureCondition = v1.ConditionUnknown n.pidPressureCondition = v1.ConditionUnknown + n.imageStates = make(map[string]*ImageStateSummary) n.generation = nextGeneration() return nil } diff --git a/pkg/scheduler/cache/node_info_test.go b/pkg/scheduler/cache/node_info_test.go index 4c20f57b1ef..cf3eae51d96 100644 --- a/pkg/scheduler/cache/node_info_test.go +++ b/pkg/scheduler/cache/node_info_test.go @@ -26,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/scheduler/util" - "k8s.io/kubernetes/pkg/util/parsers" ) func TestNewResource(t *testing.T) { @@ -241,46 +240,6 @@ func TestSetMaxResource(t *testing.T) { } } -func TestImageSizes(t *testing.T) { - ni := fakeNodeInfo() - ni.node = &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - Status: v1.NodeStatus{ - Images: []v1.ContainerImage{ - { - Names: []string{ - "gcr.io/10:" + parsers.DefaultImageTag, - "gcr.io/10:v1", - }, - SizeBytes: int64(10 * 1024 * 1024), - }, - { - Names: []string{ - "gcr.io/50:" + parsers.DefaultImageTag, - "gcr.io/50:v1", - }, - SizeBytes: int64(50 * 1024 * 1024), - }, - }, - }, - } - - ni.updateImageSizes() - expected := map[string]int64{ - "gcr.io/10:" + parsers.DefaultImageTag: 10 * 1024 * 1024, - "gcr.io/10:v1": 10 * 1024 * 1024, - "gcr.io/50:" + parsers.DefaultImageTag: 50 * 1024 * 1024, - "gcr.io/50:v1": 50 * 1024 * 1024, - } - - imageSizes := ni.ImageSizes() - if !reflect.DeepEqual(expected, imageSizes) { - t.Errorf("expected: %#v, got: %#v", expected, imageSizes) - } -} - func TestNewNodeInfo(t *testing.T) { nodeName := "test-node" pods := []*v1.Pod{ @@ -312,7 +271,7 @@ func TestNewNodeInfo(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -401,9 +360,7 @@ func TestNodeInfoClone(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{ - "gcr.io/10": 10 * 1024 * 1024, - }, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -473,9 +430,7 @@ func TestNodeInfoClone(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{ - "gcr.io/10": 10 * 1024 * 1024, - }, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -633,7 +588,7 @@ func TestNodeInfoAddPod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -752,7 +707,7 @@ func TestNodeInfoRemovePod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -868,7 +823,7 @@ func TestNodeInfoRemovePod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - imageSizes: map[string]int64{}, + imageStates: map[string]*ImageStateSummary{}, pods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{