diff --git a/pkg/scheduler/framework/listers.go b/pkg/scheduler/framework/listers.go index eaacaacdb6a..4701bc522d7 100644 --- a/pkg/scheduler/framework/listers.go +++ b/pkg/scheduler/framework/listers.go @@ -18,17 +18,25 @@ package framework // NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name. type NodeInfoLister interface { - // Returns the list of NodeInfos. + // List returns the list of NodeInfos. List() ([]*NodeInfo, error) - // Returns the list of NodeInfos of nodes with pods with affinity terms. + // HavePodsWithAffinityList returns the list of NodeInfos of nodes with pods with affinity terms. HavePodsWithAffinityList() ([]*NodeInfo, error) - // Returns the list of NodeInfos of nodes with pods with required anti-affinity terms. + // HavePodsWithRequiredAntiAffinityList returns the list of NodeInfos of nodes with pods with required anti-affinity terms. HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error) - // Returns the NodeInfo of the given node name. + // Get returns the NodeInfo of the given node name. Get(nodeName string) (*NodeInfo, error) } +// StorageInfoLister interface represents anything that handles storage-related operations and resources. +type StorageInfoLister interface { + // IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods, + // keyed in the format "namespace/name". + IsPVCUsedByPods(key string) bool +} + // SharedLister groups scheduler-specific listers. type SharedLister interface { NodeInfos() NodeInfoLister + StorageInfos() StorageInfoLister } diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 8b683b19d48..6e7e8ed9794 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -18,7 +18,6 @@ package volumerestrictions import ( "context" - "sync/atomic" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -27,16 +26,14 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" ) // VolumeRestrictions is a plugin that checks volume restrictions. type VolumeRestrictions struct { - parallelizer parallelize.Parallelizer pvcLister corelisters.PersistentVolumeClaimLister - nodeInfoLister framework.SharedLister + sharedLister framework.SharedLister enableReadWriteOncePod bool } @@ -132,12 +129,6 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo // use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable. // TODO(#103132): Mark pod as Unschedulable and add preemption logic. func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.Context, pod *v1.Pod) *framework.Status { - nodeInfos, err := pl.nodeInfoLister.NodeInfos().List() - if err != nil { - return framework.NewStatus(framework.Error, "error while getting node info") - } - - var pvcKeys []string for _, volume := range pod.Spec.Volumes { if volume.PersistentVolumeClaim == nil { continue @@ -155,29 +146,11 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.C continue } - key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName - pvcKeys = append(pvcKeys, key) - } - - subCtx, cancel := context.WithCancel(ctx) - var conflicts uint32 - - processNode := func(i int) { - nodeInfo := nodeInfos[i] - for _, key := range pvcKeys { - refCount := nodeInfo.PVCRefCounts[key] - if refCount > 0 { - atomic.AddUint32(&conflicts, 1) - cancel() - } + key := framework.GetNamespacedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName) + if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict) } } - pl.parallelizer.Until(subCtx, len(nodeInfos), processNode) - - // Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet. - if conflicts > 0 { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict) - } return nil } @@ -232,12 +205,11 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent { func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() - nodeInfoLister := handle.SnapshotSharedLister() + sharedLister := handle.SnapshotSharedLister() return &VolumeRestrictions{ - parallelizer: handle.Parallelizer(), pvcLister: pvcLister, - nodeInfoLister: nodeInfoLister, + sharedLister: sharedLister, enableReadWriteOncePod: fts.EnableReadWriteOncePod, }, nil } diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 0c9dd74c380..bde3974439d 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -549,7 +549,7 @@ func (n *NodeInfo) Clone() *NodeInfo { Allocatable: n.Allocatable.Clone(), UsedPorts: make(HostPortInfo), ImageStates: n.ImageStates, - PVCRefCounts: n.PVCRefCounts, + PVCRefCounts: make(map[string]int), Generation: n.Generation, } if len(n.Pods) > 0 { @@ -571,6 +571,9 @@ func (n *NodeInfo) Clone() *NodeInfo { if len(n.PodsWithRequiredAntiAffinity) > 0 { clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...) } + for key, value := range n.PVCRefCounts { + clone.PVCRefCounts[key] = value + } return clone } @@ -770,7 +773,7 @@ func (n *NodeInfo) updatePVCRefCounts(pod *v1.Pod, add bool) { continue } - key := pod.Namespace + "/" + v.PersistentVolumeClaim.ClaimName + key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName) if add { n.PVCRefCounts[key] += 1 } else { @@ -838,6 +841,11 @@ func GetPodKey(pod *v1.Pod) (string, error) { return uid, nil } +// GetNamespacedName returns the string format of a namespaced resource name. +func GetNamespacedName(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + // DefaultBindAllHostIP defines the default ip address used to bind to all host. const DefaultBindAllHostIP = "0.0.0.0" diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 73fb6c367b5..1cd69a9704d 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -191,7 +191,7 @@ func (cache *cacheImpl) Dump() *Dump { // UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at // beginning of every scheduling cycle. // The snapshot only includes Nodes that are not deleted at the time this function is called. -// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. +// nodeInfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. // This function tracks generation number of NodeInfo and updates only the // entries of an existing snapshot that have changed after the snapshot was taken. func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { @@ -212,6 +212,9 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { // status from having pods with required anti-affinity to NOT having pods with required // anti-affinity or the other way around. updateNodesHavePodsWithRequiredAntiAffinity := false + // usedPVCSet must be re-created whenever the head node generation is greater than + // last snapshot generation. + updateUsedPVCSet := false // Start from the head of the NodeInfo doubly linked list and update snapshot // of NodeInfos updated after the last snapshot. @@ -237,6 +240,18 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) { updateNodesHavePodsWithRequiredAntiAffinity = true } + if !updateUsedPVCSet { + if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) { + updateUsedPVCSet = true + } else { + for pvcKey := range clone.PVCRefCounts { + if _, found := existing.PVCRefCounts[pvcKey]; !found { + updateUsedPVCSet = true + break + } + } + } + } // We need to preserve the original pointer of the NodeInfo struct since it // is used in the NodeInfoList, which we may not update. *existing = *clone @@ -255,7 +270,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { updateAllLists = true } - if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity { + if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet { cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists) } @@ -278,6 +293,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error { func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + snapshot.usedPVCSet = sets.NewString() if updateAll { // Take a snapshot of the nodes order in the tree snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) @@ -294,6 +310,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) } + for key := range nodeInfo.PVCRefCounts { + snapshot.usedPVCSet.Insert(key) + } } else { klog.ErrorS(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName)) } @@ -306,6 +325,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) } + for key := range nodeInfo.PVCRefCounts { + snapshot.usedPVCSet.Insert(key) + } } } } @@ -366,7 +388,7 @@ func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error { return cache.finishBinding(pod, time.Now()) } -// finishBinding exists to make tests determinitistic by injecting now as an argument +// finishBinding exists to make tests deterministic by injecting now as an argument func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error { key, err := framework.GetPodKey(pod) if err != nil { diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index 1c80bd31ffd..b992fec983a 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -24,10 +24,14 @@ import ( "testing" "time" + st "k8s.io/kubernetes/pkg/scheduler/testing" + + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/framework" schedutil "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -1169,7 +1173,7 @@ func TestNodeOperators(t *testing.T) { func TestSchedulerCache_UpdateSnapshot(t *testing.T) { // Create a few nodes to be used in tests. - nodes := []*v1.Node{} + var nodes []*v1.Node for i := 0; i < 10; i++ { node := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -1185,7 +1189,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { nodes = append(nodes, node) } // Create a few nodes as updated versions of the above nodes - updatedNodes := []*v1.Node{} + var updatedNodes []*v1.Node for _, n := range nodes { updatedNode := n.DeepCopy() updatedNode.Status.Allocatable = v1.ResourceList{ @@ -1196,7 +1200,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } // Create a few pods for tests. - pods := []*v1.Pod{} + var pods []*v1.Pod for i := 0; i < 20; i++ { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1212,7 +1216,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } // Create a few pods as updated versions of the above pods. - updatedPods := []*v1.Pod{} + var updatedPods []*v1.Pod for _, p := range pods { updatedPod := p.DeepCopy() priority := int32(1000) @@ -1221,24 +1225,21 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } // Add a couple of pods with affinity, on the first and seconds nodes. - podsWithAffinity := []*v1.Pod{} + var podsWithAffinity []*v1.Pod for i := 0; i < 2; i++ { - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-pod%v", i), - Namespace: "test-ns", - UID: types.UID(fmt.Sprintf("test-puid%v", i)), - }, - Spec: v1.PodSpec{ - NodeName: fmt.Sprintf("test-node%v", i), - Affinity: &v1.Affinity{ - PodAffinity: &v1.PodAffinity{}, - }, - }, - } + pod := st.MakePod().Name(fmt.Sprintf("p-affinity-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-affinity-%v", i)). + PodAffinityExists("foo", "", st.PodAffinityWithRequiredReq).Node(fmt.Sprintf("test-node%v", i)).Obj() podsWithAffinity = append(podsWithAffinity, pod) } + // Add a few of pods with PVC + var podsWithPVC []*v1.Pod + for i := 0; i < 8; i++ { + pod := st.MakePod().Name(fmt.Sprintf("p-pvc-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-pvc-%v", i)). + PVC(fmt.Sprintf("test-pvc%v", i%4)).Node(fmt.Sprintf("test-node%v", i%2)).Obj() + podsWithPVC = append(podsWithPVC, pod) + } + var cache *cacheImpl var snapshot *Snapshot type operation = func(t *testing.T) @@ -1274,6 +1275,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } } + addPodWithPVC := func(i int) operation { + return func(t *testing.T) { + if err := cache.AddPod(podsWithPVC[i]); err != nil { + t.Error(err) + } + } + } removePod := func(i int) operation { return func(t *testing.T) { if err := cache.RemovePod(pods[i]); err != nil { @@ -1288,6 +1296,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { } } } + removePodWithPVC := func(i int) operation { + return func(t *testing.T) { + if err := cache.RemovePod(podsWithPVC[i]); err != nil { + t.Error(err) + } + } + } updatePod := func(i int) operation { return func(t *testing.T) { if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil { @@ -1309,30 +1324,35 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { operations []operation expected []*v1.Node expectedHavePodsWithAffinity int + expectedUsedPVCSet sets.String }{ { - name: "Empty cache", - operations: []operation{}, - expected: []*v1.Node{}, + name: "Empty cache", + operations: []operation{}, + expected: []*v1.Node{}, + expectedUsedPVCSet: sets.NewString(), }, { - name: "Single node", - operations: []operation{addNode(1)}, - expected: []*v1.Node{nodes[1]}, + name: "Single node", + operations: []operation{addNode(1)}, + expected: []*v1.Node{nodes[1]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add node, remove it, add it again", operations: []operation{ addNode(1), updateSnapshot(), removeNode(1), addNode(1), }, - expected: []*v1.Node{nodes[1]}, + expected: []*v1.Node{nodes[1]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add node and remove it in the same cycle, add it again", operations: []operation{ addNode(1), updateSnapshot(), addNode(2), removeNode(1), }, - expected: []*v1.Node{nodes[2]}, + expected: []*v1.Node{nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and snapshot in the middle", @@ -1340,21 +1360,24 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2), updateSnapshot(), addNode(3), }, - expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]}, + expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and snapshot in the end", operations: []operation{ addNode(0), addNode(2), addNode(5), addNode(6), }, - expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]}, + expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Update some nodes", operations: []operation{ addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1), }, - expected: []*v1.Node{nodes[1], nodes[5], nodes[0]}, + expected: []*v1.Node{nodes[1], nodes[5], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and remove all of them", @@ -1362,7 +1385,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), removeNode(0), removeNode(2), removeNode(5), removeNode(6), }, - expected: []*v1.Node{}, + expected: []*v1.Node{}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, and remove some of them", @@ -1370,7 +1394,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), removeNode(0), removeNode(6), }, - expected: []*v1.Node{nodes[5], nodes[2]}, + expected: []*v1.Node{nodes[5], nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add a few nodes, remove all of them, and add more", @@ -1379,7 +1404,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { removeNode(2), removeNode(5), removeNode(6), updateSnapshot(), addNode(7), addNode(9), }, - expected: []*v1.Node{nodes[9], nodes[7]}, + expected: []*v1.Node{nodes[9], nodes[7]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Update nodes in particular order", @@ -1387,7 +1413,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(8), updateNode(2), updateNode(8), updateSnapshot(), addNode(1), }, - expected: []*v1.Node{nodes[1], nodes[8], nodes[2]}, + expected: []*v1.Node{nodes[1], nodes[8], nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add some nodes and some pods", @@ -1395,21 +1422,24 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { addNode(0), addNode(2), addNode(8), updateSnapshot(), addPod(8), addPod(2), }, - expected: []*v1.Node{nodes[2], nodes[8], nodes[0]}, + expected: []*v1.Node{nodes[2], nodes[8], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Updating a pod moves its node to the head", operations: []operation{ addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0), }, - expected: []*v1.Node{nodes[0], nodes[4], nodes[2]}, + expected: []*v1.Node{nodes[0], nodes[4], nodes[2]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add pod before its node", operations: []operation{ addNode(0), addPod(1), updatePod(1), addNode(1), }, - expected: []*v1.Node{nodes[1], nodes[0]}, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Remove node before its pods", @@ -1418,7 +1448,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { removeNode(1), updateSnapshot(), updatePod(1), updatePod(11), removePod(1), removePod(11), }, - expected: []*v1.Node{nodes[0]}, + expected: []*v1.Node{nodes[0]}, + expectedUsedPVCSet: sets.NewString(), }, { name: "Add Pods with affinity", @@ -1427,6 +1458,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { }, expected: []*v1.Node{nodes[1], nodes[0]}, expectedHavePodsWithAffinity: 1, + expectedUsedPVCSet: sets.NewString(), + }, + { + name: "Add Pods with PVC", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"), }, { name: "Add multiple nodes with pods with affinity", @@ -1435,6 +1475,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { }, expected: []*v1.Node{nodes[1], nodes[0]}, expectedHavePodsWithAffinity: 2, + expectedUsedPVCSet: sets.NewString(), + }, + { + name: "Add multiple nodes with pods with PVC", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1), addPodWithPVC(1), updateSnapshot(), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0", "test-ns/test-pvc1"), }, { name: "Add then Remove pods with affinity", @@ -1443,6 +1492,43 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { }, expected: []*v1.Node{nodes[0], nodes[1]}, expectedHavePodsWithAffinity: 0, + expectedUsedPVCSet: sets.NewString(), + }, + { + name: "Add then Remove pod with PVC", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(2), updateSnapshot(), + }, + expected: []*v1.Node{nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2"), + }, + { + name: "Add then Remove pod with PVC and add same pod again", + operations: []operation{ + addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(0), updateSnapshot(), + }, + expected: []*v1.Node{nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"), + }, + { + name: "Add and Remove multiple pods with PVC with same ref count length different content", + operations: []operation{ + addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), updateSnapshot(), + removePodWithPVC(0), removePodWithPVC(1), addPodWithPVC(2), addPodWithPVC(3), updateSnapshot(), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2", "test-ns/test-pvc3"), + }, + { + name: "Add and Remove multiple pods with PVC", + operations: []operation{ + addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), addPodWithPVC(2), updateSnapshot(), + removePodWithPVC(0), removePodWithPVC(1), updateSnapshot(), addPodWithPVC(0), updateSnapshot(), + addPodWithPVC(3), addPodWithPVC(4), addPodWithPVC(5), updateSnapshot(), + removePodWithPVC(0), removePodWithPVC(3), removePodWithPVC(4), updateSnapshot(), + }, + expected: []*v1.Node{nodes[0], nodes[1]}, + expectedUsedPVCSet: sets.NewString("test-ns/test-pvc1", "test-ns/test-pvc2"), }, } @@ -1476,6 +1562,11 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) { t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList)) } + // Compare content of the used PVC set + if diff := cmp.Diff(test.expectedUsedPVCSet, snapshot.usedPVCSet); diff != "" { + t.Errorf("Unexpected usedPVCSet (-want +got):\n%s", diff) + } + // Always update the snapshot at the end of operations and compare it. if err := cache.UpdateSnapshot(snapshot); err != nil { t.Error(err) @@ -1509,6 +1600,7 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot * expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) expectedHavePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) + expectedUsedPVCSet := sets.NewString() nodesList, err := cache.nodeTree.list() if err != nil { t.Fatal(err) @@ -1519,6 +1611,9 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot * if len(n.PodsWithAffinity) > 0 { expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n) } + for key := range n.PVCRefCounts { + expectedUsedPVCSet.Insert(key) + } } else { return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName) } @@ -1538,12 +1633,18 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot * } } + for key := range expectedUsedPVCSet { + if !snapshot.usedPVCSet.Has(key) { + return fmt.Errorf("expected PVC %s to exist in UsedPVCSet but it is not found", key) + } + } + return nil } func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) { // Create a few nodes to be used in tests. - nodes := []*v1.Node{} + var nodes []*v1.Node i := 0 // List of number of nodes per zone, zone 0 -> 2, zone 1 -> 6 for zone, nb := range []int{2, 6} { diff --git a/pkg/scheduler/internal/cache/snapshot.go b/pkg/scheduler/internal/cache/snapshot.go index bf312fcdf9c..78b67322a41 100644 --- a/pkg/scheduler/internal/cache/snapshot.go +++ b/pkg/scheduler/internal/cache/snapshot.go @@ -36,7 +36,10 @@ type Snapshot struct { // havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring // required anti-affinity terms. havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo - generation int64 + // usedPVCSet contains a set of PVC names that have one or more scheduled pods using them, + // keyed in the format "namespace/name". + usedPVCSet sets.String + generation int64 } var _ framework.SharedLister = &Snapshot{} @@ -45,6 +48,7 @@ var _ framework.SharedLister = &Snapshot{} func NewEmptySnapshot() *Snapshot { return &Snapshot{ nodeInfoMap: make(map[string]*framework.NodeInfo), + usedPVCSet: sets.NewString(), } } @@ -69,6 +73,7 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot { s.nodeInfoList = nodeInfoList s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList + s.usedPVCSet = createUsedPVCSet(pods) return s } @@ -98,6 +103,25 @@ func createNodeInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*framework.N return nodeNameToInfo } +func createUsedPVCSet(pods []*v1.Pod) sets.String { + usedPVCSet := sets.NewString() + for _, pod := range pods { + if pod.Spec.NodeName == "" { + continue + } + + for _, v := range pod.Spec.Volumes { + if v.PersistentVolumeClaim == nil { + continue + } + + key := framework.GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName) + usedPVCSet.Insert(key) + } + } + return usedPVCSet +} + // getNodeImageStates returns the given node's image states based on the given imageExistence map. func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*framework.ImageStateSummary { imageStates := make(map[string]*framework.ImageStateSummary) @@ -135,6 +159,11 @@ func (s *Snapshot) NodeInfos() framework.NodeInfoLister { return s } +// StorageInfos returns a StorageInfoLister. +func (s *Snapshot) StorageInfos() framework.StorageInfoLister { + return s +} + // NumNodes returns the number of nodes in the snapshot. func (s *Snapshot) NumNodes() int { return len(s.nodeInfoList) @@ -163,3 +192,7 @@ func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) { } return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName) } + +func (s *Snapshot) IsPVCUsedByPods(key string) bool { + return s.usedPVCSet.Has(key) +} diff --git a/pkg/scheduler/internal/cache/snapshot_test.go b/pkg/scheduler/internal/cache/snapshot_test.go index 94c5d5a2ea5..6675c728a3a 100644 --- a/pkg/scheduler/internal/cache/snapshot_test.go +++ b/pkg/scheduler/internal/cache/snapshot_test.go @@ -21,10 +21,12 @@ import ( "reflect" "testing" - "k8s.io/api/core/v1" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/scheduler/framework" + st "k8s.io/kubernetes/pkg/scheduler/testing" ) const mb int64 = 1024 * 1024 @@ -180,3 +182,50 @@ func TestCreateImageExistenceMap(t *testing.T) { }) } } + +func TestCreateUsedPVCSet(t *testing.T) { + tests := []struct { + name string + pods []*v1.Pod + expected sets.String + }{ + { + name: "empty pods list", + pods: []*v1.Pod{}, + expected: sets.NewString(), + }, + { + name: "pods not scheduled", + pods: []*v1.Pod{ + st.MakePod().Name("foo").Namespace("foo").Obj(), + st.MakePod().Name("bar").Namespace("bar").Obj(), + }, + expected: sets.NewString(), + }, + { + name: "scheduled pods that do not use any PVC", + pods: []*v1.Pod{ + st.MakePod().Name("foo").Namespace("foo").Node("node-1").Obj(), + st.MakePod().Name("bar").Namespace("bar").Node("node-2").Obj(), + }, + expected: sets.NewString(), + }, + { + name: "scheduled pods that use PVC", + pods: []*v1.Pod{ + st.MakePod().Name("foo").Namespace("foo").Node("node-1").PVC("pvc1").Obj(), + st.MakePod().Name("bar").Namespace("bar").Node("node-2").PVC("pvc2").Obj(), + }, + expected: sets.NewString("foo/pvc1", "bar/pvc2"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + usedPVCs := createUsedPVCSet(test.pods) + if diff := cmp.Diff(test.expected, usedPVCs); diff != "" { + t.Errorf("Unexpected usedPVCs (-want +got):\n%s", diff) + } + }) + } +}