From 32c18a34798460415ae5785002bad3733fec65a6 Mon Sep 17 00:00:00 2001 From: Yibo Zhuang Date: Tue, 3 May 2022 18:00:41 -0700 Subject: [PATCH] Adding StorageInfoLister to SharedLister This change creates a StorageInfoLister interface and have it under scheduler SharedLister. The initial StorageInfoLister interface has a IsPVCUsedByPods which returns true/false depending on whether the PVC (keyed by namespace/name) has at least one scheduled pod using it. In snapshot real implementation, add a usedPVCSet key by PVC namespace/name which contains all PVCs that have at least one scheduled pod using it. During snapshot update, populate this set based on whether the PVCRefCounts map for node(s) have been updated since last snapshot. Signed-off-by: Yibo Zhuang --- pkg/scheduler/framework/listers.go | 16 +- pkg/scheduler/framework/types.go | 12 +- pkg/scheduler/internal/cache/cache.go | 28 ++- pkg/scheduler/internal/cache/cache_test.go | 177 ++++++++++++++---- pkg/scheduler/internal/cache/snapshot.go | 35 +++- pkg/scheduler/internal/cache/snapshot_test.go | 51 ++++- 6 files changed, 270 insertions(+), 49 deletions(-) diff --git a/pkg/scheduler/framework/listers.go b/pkg/scheduler/framework/listers.go index eaacaacdb6a..4701bc522d7 100644 --- a/pkg/scheduler/framework/listers.go +++ b/pkg/scheduler/framework/listers.go @@ -18,17 +18,25 @@ package framework // NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name. type NodeInfoLister interface { - // Returns the list of NodeInfos. + // List returns the list of NodeInfos. List() ([]*NodeInfo, error) - // Returns the list of NodeInfos of nodes with pods with affinity terms. + // HavePodsWithAffinityList returns the list of NodeInfos of nodes with pods with affinity terms. HavePodsWithAffinityList() ([]*NodeInfo, error) - // Returns the list of NodeInfos of nodes with pods with required anti-affinity terms. + // HavePodsWithRequiredAntiAffinityList returns the list of NodeInfos of nodes with pods with required anti-affinity terms. HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error) - // Returns the NodeInfo of the given node name. + // Get returns the NodeInfo of the given node name. Get(nodeName string) (*NodeInfo, error) } +// StorageInfoLister interface represents anything that handles storage-related operations and resources. +type StorageInfoLister interface { + // IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods, + // keyed in the format "namespace/name". + IsPVCUsedByPods(key string) bool +} + // SharedLister groups scheduler-specific listers. type SharedLister interface { NodeInfos() NodeInfoLister + StorageInfos() StorageInfoLister } diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 0c9dd74c380..bde3974439d 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -549,7 +549,7 @@ func (n *NodeInfo) Clone() *NodeInfo { Allocatable: n.Allocatable.Clone(), UsedPorts: make(HostPortInfo), ImageStates: n.ImageStates, - PVCRefCounts: n.PVCRefCounts, + PVCRefCounts: make(map[string]int), Generation: n.Generation, } if len(n.Pods) > 0 { @@ -571,6 +571,9 @@ func (n *NodeInfo) Clone() *NodeInfo { if len(n.PodsWithRequiredAntiAffinity) > 0 { clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...) } + for key, value := range n.PVCRefCounts { + clone.PVCRefCounts[key] = value + } return clone } @@ -770,7 +773,7 @@ func (n *NodeInfo) updatePVCRefCounts(pod *v1.Pod, add bool) { continue } - key := pod.Namespace + "/" + v.PersistentVolumeClaim.ClaimName + key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName) if add { n.PVCRefCounts[key] += 1 } else { @@ -838,6 +841,11 @@ func GetPodKey(pod *v1.Pod) (string, error) { return uid, nil } +// GetNamespacedName returns the string format of a namespaced resource name. +func GetNamespacedName(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + // DefaultBindAllHostIP defines the default ip address used to bind to all host. const DefaultBindAllHostIP = "0.0.0.0" diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 73fb6c367b5..1cd69a9704d 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -191,7 +191,7 @@ func (cache *cacheImpl) Dump() *Dump { // UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at // beginning of every scheduling cycle. // The snapshot only includes Nodes that are not deleted at the time this function is called. -// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. +// nodeInfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. // This function tracks generation number of NodeInfo and updates only the // entries of an existing snapshot that have changed after the snapshot was taken. func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { @@ -212,6 +212,9 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { // status from having pods with required anti-affinity to NOT having pods with required // anti-affinity or the other way around. updateNodesHavePodsWithRequiredAntiAffinity := false + // usedPVCSet must be re-created whenever the head node generation is greater than + // last snapshot generation. + updateUsedPVCSet := false // Start from the head of the NodeInfo doubly linked list and update snapshot // of NodeInfos updated after the last snapshot. @@ -237,6 +240,18 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) { updateNodesHavePodsWithRequiredAntiAffinity = true } + if !updateUsedPVCSet { + if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) { + updateUsedPVCSet = true + } else { + for pvcKey := range clone.PVCRefCounts { + if _, found := existing.PVCRefCounts[pvcKey]; !found { + updateUsedPVCSet = true + break + } + } + } + } // We need to preserve the original pointer of the NodeInfo struct since it // is used in the NodeInfoList, which we may not update. *existing = *clone @@ -255,7 +270,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { updateAllLists = true } - if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity { + if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet { cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists) } @@ -278,6 +293,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + snapshot.usedPVCSet = sets.NewString() if updateAll { // Take a snapshot of the nodes order in the tree snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) @@ -294,6 +310,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) } + for key := range nodeInfo.PVCRefCounts { + snapshot.usedPVCSet.Insert(key) + } } else { klog.ErrorS(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName)) } @@ -306,6 +325,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) } + for key := range nodeInfo.PVCRefCounts { + snapshot.usedPVCSet.Insert(key) + } } } } @@ -366,7 +388,7 @@ func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error { return cache.finishBinding(pod, time.Now()) } -// finishBinding exists to make tests determinitistic by injecting now as an argument +// finishBinding exists to make tests deterministic by injecting now as an argument func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error { key, err := framework.GetPodKey(pod) if err != nil { diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 1c80bd31ffd..b992fec983a 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -24,10 +24,14 @@ import ( "testing" "time" + st "k8s.io/kubernetes/pkg/scheduler/testing" + + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/framework" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -1169,7 +1173,7 @@ func TestNodeOperators(t *testing.T) { func TestSchedulerCache_UpdateSnapshot(t *testing.T) { // Create a few nodes to be used in tests. - nodes := []*v1.Node{} + var nodes []*v1.Node for i := 0; i < 10; i++ { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -1185,7 +1189,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { nodes = append(nodes, node) } // Create a few nodes as updated versions of the above nodes - updatedNodes := []*v1.Node{} + var updatedNodes []*v1.Node for _, n := range nodes { updatedNode := n.DeepCopy() updatedNode.Status.Allocatable = v1.ResourceList{ @@ -1196,7 +1200,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } // Create a few pods for tests. - pods := []*v1.Pod{} + var pods []*v1.Pod for i := 0; i < 20; i++ { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1212,7 +1216,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } // Create a few pods as updated versions of the above pods. - updatedPods := []*v1.Pod{} + var updatedPods []*v1.Pod for _, p := range pods { updatedPod := p.DeepCopy() priority := int32(1000) @@ -1221,24 +1225,21 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } // Add a couple of pods with affinity, on the first and seconds nodes. - podsWithAffinity := []*v1.Pod{} + var podsWithAffinity []*v1.Pod for i := 0; i < 2; i++ { - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-pod%v", i), - Namespace: "test-ns", - UID: types.UID(fmt.Sprintf("test-puid%v", i)), - }, - Spec: v1.PodSpec{ - NodeName: fmt.Sprintf("test-node%v", i), - Affinity: &v1.Affinity{ - PodAffinity: &v1.PodAffinity{}, - }, - }, - } + pod := st.MakePod().Name(fmt.Sprintf("p-affinity-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-affinity-%v", i)). + PodAffinityExists("foo", "", st.PodAffinityWithRequiredReq).Node(fmt.Sprintf("test-node%v", i)).Obj() podsWithAffinity = append(podsWithAffinity, pod) } + // Add a few of pods with PVC + var podsWithPVC []*v1.Pod + for i := 0; i < 8; i++ { + pod := st.MakePod().Name(fmt.Sprintf("p-pvc-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-pvc-%v", i)). + PVC(fmt.Sprintf("test-pvc%v", i%4)).Node(fmt.Sprintf("test-node%v", i%2)).Obj() + podsWithPVC = append(podsWithPVC, pod) + } + var cache *cacheImpl var snapshot *Snapshot type operation = func(t *testing.T) @@ -1274,6 +1275,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } } + addPodWithPVC := func(i int) operation { + return func(t *testing.T) { + if err := cache.AddPod(podsWithPVC[i]); err != nil { + t.Error(err) + } + } + } removePod := func(i int) operation { return func(t *testing.T) { if err := cache.RemovePod(pods[i]); err != nil { @@ -1288,6 +1296,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } } + removePodWithPVC := func(i int) operation { + return func(t *testing.T) { + if err := cache.RemovePod(podsWithPVC[i]); err != nil { + t.Error(err) + } + } + } updatePod := func(i int) operation { return func(t *testing.T) { if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil { @@ -1309,30 +1324,35 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { operations []operation expected []*v1.Node expectedHavePodsWithAffinity int + expectedUsedPVCSet sets.String }{ { - name: "Empty cache", - operations: []operation{}, - expected: []*v1.Node{}, + name: "Empty cache", + operations: []operation{}, + expected: []*v1.Node{}, + expectedUsedPVCSet: sets.NewString(), }, { - name: "Single node", - operations: []operation{addNode(1)}, - expected: []*v1.Node{nodes[1]}, + name: "Single node", + operations: []operation{addNode(1)}, + expected: []*v1.Node{nodes[1]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add node, remove it, add it again", operations: []operation{ addNode(1), updateSnapshot(), removeNode(1), addNode(1), }, - expected: []*v1.Node{nodes[1]}, + expected: []*v1.Node{nodes[1]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add node and remove it in the same cycle, add it again", operations: []operation{ addNode(1), updateSnapshot(), addNode(2), removeNode(1), }, - expected: []*v1.Node{nodes[2]}, + expected: []*v1.Node{nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and snapshot in the middle", @@ -1340,21 +1360,24 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2), updateSnapshot(), addNode(3), }, - expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]}, + expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and snapshot in the end", operations: []operation{ addNode(0), addNode(2), addNode(5), addNode(6), }, - expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]}, + expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Update some nodes", operations: []operation{ addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1), }, - expected: []*v1.Node{nodes[1], nodes[5], nodes[0]}, + expected: []*v1.Node{nodes[1], nodes[5], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and remove all of them", @@ -1362,7 +1385,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), removeNode(0), removeNode(2), removeNode(5), removeNode(6), }, - expected: []*v1.Node{}, + expected: []*v1.Node{}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and remove some of them", @@ -1370,7 +1394,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), removeNode(0), removeNode(6), }, - expected: []*v1.Node{nodes[5], nodes[2]}, + expected: []*v1.Node{nodes[5], nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, remove all of them, and add more", @@ -1379,7 +1404,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { removeNode(2), removeNode(5), removeNode(6), updateSnapshot(), addNode(7), addNode(9), }, - expected: []*v1.Node{nodes[9], nodes[7]}, + expected: []*v1.Node{nodes[9], nodes[7]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Update nodes in particular order", @@ -1387,7 +1413,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(8), updateNode(2), updateNode(8), updateSnapshot(), addNode(1), }, - expected: []*v1.Node{nodes[1], nodes[8], nodes[2]}, + expected: []*v1.Node{nodes[1], nodes[8], nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add some nodes and some pods", @@ -1395,21 +1422,24 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), addNode(2), addNode(8), updateSnapshot(), addPod(8), addPod(2), }, - expected: []*v1.Node{nodes[2], nodes[8], nodes[0]}, + expected: []*v1.Node{nodes[2], nodes[8], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Updating a pod moves its node to the head", operations: []operation{ addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0), }, - expected: []*v1.Node{nodes[0], nodes[4], nodes[2]}, + expected: []*v1.Node{nodes[0], nodes[4], nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add pod before its node", operations: []operation{ addNode(0), addPod(1), updatePod(1), addNode(1), }, - expected: []*v1.Node{nodes[1], nodes[0]}, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Remove node before its pods", @@ -1418,7 +1448,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { removeNode(1), updateSnapshot(), updatePod(1), updatePod(11), removePod(1), removePod(11), }, - expected: []*v1.Node{nodes[0]}, + expected: []*v1.Node{nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add Pods with affinity", @@ -1427,6 +1458,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { }, expected: []*v1.Node{nodes[1], nodes[0]}, expectedHavePodsWithAffinity: 1, + expectedUsedPVCSet: sets.NewString(), + }, + { + name: "Add Pods with PVC", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"), }, { name: "Add multiple nodes with pods with affinity", @@ -1435,6 +1475,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { }, expected: []*v1.Node{nodes[1], nodes[0]}, expectedHavePodsWithAffinity: 2, + expectedUsedPVCSet: sets.NewString(), + }, + { + name: "Add multiple nodes with pods with PVC", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1), addPodWithPVC(1), updateSnapshot(), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0", "test-ns/test-pvc1"), }, { name: "Add then Remove pods with affinity", @@ -1443,6 +1492,43 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { }, expected: []*v1.Node{nodes[0], nodes[1]}, expectedHavePodsWithAffinity: 0, + expectedUsedPVCSet: sets.NewString(), + }, + { + name: "Add then Remove pod with PVC", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(2), updateSnapshot(), + }, + expected: []*v1.Node{nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2"), + }, + { + name: "Add then Remove pod with PVC and add same pod again", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(0), updateSnapshot(), + }, + expected: []*v1.Node{nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"), + }, + { + name: "Add and Remove multiple pods with PVC with same ref count length different content", + operations: []operation{ + addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), updateSnapshot(), + removePodWithPVC(0), removePodWithPVC(1), addPodWithPVC(2), addPodWithPVC(3), updateSnapshot(), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2", "test-ns/test-pvc3"), + }, + { + name: "Add and Remove multiple pods with PVC", + operations: []operation{ + addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), addPodWithPVC(2), updateSnapshot(), + removePodWithPVC(0), removePodWithPVC(1), updateSnapshot(), addPodWithPVC(0), updateSnapshot(), + addPodWithPVC(3), addPodWithPVC(4), addPodWithPVC(5), updateSnapshot(), + removePodWithPVC(0), removePodWithPVC(3), removePodWithPVC(4), updateSnapshot(), + }, + expected: []*v1.Node{nodes[0], nodes[1]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc1", "test-ns/test-pvc2"), }, } @@ -1476,6 +1562,11 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList)) } + // Compare content of the used PVC set + if diff := cmp.Diff(test.expectedUsedPVCSet, snapshot.usedPVCSet); diff != "" { + t.Errorf("Unexpected usedPVCSet (-want +got):\n%s", diff) + } + // Always update the snapshot at the end of operations and compare it. if err := cache.UpdateSnapshot(snapshot); err != nil { t.Error(err) @@ -1509,6 +1600,7 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot * expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) expectedHavePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + expectedUsedPVCSet := sets.NewString() nodesList, err := cache.nodeTree.list() if err != nil { t.Fatal(err) @@ -1519,6 +1611,9 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot * if len(n.PodsWithAffinity) > 0 { expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n) } + for key := range n.PVCRefCounts { + expectedUsedPVCSet.Insert(key) + } } else { return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName) } @@ -1538,12 +1633,18 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot * } } + for key := range expectedUsedPVCSet { + if !snapshot.usedPVCSet.Has(key) { + return fmt.Errorf("expected PVC %s to exist in UsedPVCSet but it is not found", key) + } + } + return nil } func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) { // Create a few nodes to be used in tests. - nodes := []*v1.Node{} + var nodes []*v1.Node i := 0 // List of number of nodes per zone, zone 0 -> 2, zone 1 -> 6 for zone, nb := range []int{2, 6} { diff --git a/pkg/scheduler/internal/cache/snapshot.go b/pkg/scheduler/internal/cache/snapshot.go index bf312fcdf9c..78b67322a41 100644 --- a/pkg/scheduler/internal/cache/snapshot.go +++ b/pkg/scheduler/internal/cache/snapshot.go @@ -36,7 +36,10 @@ type Snapshot struct { // havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring // required anti-affinity terms. havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo - generation int64 + // usedPVCSet contains a set of PVC names that have one or more scheduled pods using them, + // keyed in the format "namespace/name". + usedPVCSet sets.String + generation int64 } var _ framework.SharedLister = &Snapshot{} @@ -45,6 +48,7 @@ var _ framework.SharedLister = &Snapshot{} func NewEmptySnapshot() *Snapshot { return &Snapshot{ nodeInfoMap: make(map[string]*framework.NodeInfo), + usedPVCSet: sets.NewString(), } } @@ -69,6 +73,7 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot { s.nodeInfoList = nodeInfoList s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList + s.usedPVCSet = createUsedPVCSet(pods) return s } @@ -98,6 +103,25 @@ func createNodeInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*framework.N return nodeNameToInfo } +func createUsedPVCSet(pods []*v1.Pod) sets.String { + usedPVCSet := sets.NewString() + for _, pod := range pods { + if pod.Spec.NodeName == "" { + continue + } + + for _, v := range pod.Spec.Volumes { + if v.PersistentVolumeClaim == nil { + continue + } + + key := framework.GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName) + usedPVCSet.Insert(key) + } + } + return usedPVCSet +} + // getNodeImageStates returns the given node's image states based on the given imageExistence map. func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*framework.ImageStateSummary { imageStates := make(map[string]*framework.ImageStateSummary) @@ -135,6 +159,11 @@ func (s *Snapshot) NodeInfos() framework.NodeInfoLister { return s } +// StorageInfos returns a StorageInfoLister. +func (s *Snapshot) StorageInfos() framework.StorageInfoLister { + return s +} + // NumNodes returns the number of nodes in the snapshot. func (s *Snapshot) NumNodes() int { return len(s.nodeInfoList) @@ -163,3 +192,7 @@ func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) { } return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName) } + +func (s *Snapshot) IsPVCUsedByPods(key string) bool { + return s.usedPVCSet.Has(key) +} diff --git a/pkg/scheduler/internal/cache/snapshot_test.go b/pkg/scheduler/internal/cache/snapshot_test.go index 94c5d5a2ea5..6675c728a3a 100644 --- a/pkg/scheduler/internal/cache/snapshot_test.go +++ b/pkg/scheduler/internal/cache/snapshot_test.go @@ -21,10 +21,12 @@ import ( "reflect" "testing" - "k8s.io/api/core/v1" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/framework" + st "k8s.io/kubernetes/pkg/scheduler/testing" ) const mb int64 = 1024 * 1024 @@ -180,3 +182,50 @@ func TestCreateImageExistenceMap(t *testing.T) { }) } } + +func TestCreateUsedPVCSet(t *testing.T) { + tests := []struct { + name string + pods []*v1.Pod + expected sets.String + }{ + { + name: "empty pods list", + pods: []*v1.Pod{}, + expected: sets.NewString(), + }, + { + name: "pods not scheduled", + pods: []*v1.Pod{ + st.MakePod().Name("foo").Namespace("foo").Obj(), + st.MakePod().Name("bar").Namespace("bar").Obj(), + }, + expected: sets.NewString(), + }, + { + name: "scheduled pods that do not use any PVC", + pods: []*v1.Pod{ + st.MakePod().Name("foo").Namespace("foo").Node("node-1").Obj(), + st.MakePod().Name("bar").Namespace("bar").Node("node-2").Obj(), + }, + expected: sets.NewString(), + }, + { + name: "scheduled pods that use PVC", + pods: []*v1.Pod{ + st.MakePod().Name("foo").Namespace("foo").Node("node-1").PVC("pvc1").Obj(), + st.MakePod().Name("bar").Namespace("bar").Node("node-2").PVC("pvc2").Obj(), + }, + expected: sets.NewString("foo/pvc1", "bar/pvc2"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + usedPVCs := createUsedPVCSet(test.pods) + if diff := cmp.Diff(test.expected, usedPVCs); diff != "" { + t.Errorf("Unexpected usedPVCs (-want +got):\n%s", diff) + } + }) + } +}