Merge pull request #109715 from yibozhuang/volumerestrictions-refactor

Remove parallel node processing in PreFilter stage in volumerestrictions scheduler plugin
This commit is contained in:
Kubernetes Prow Robot 2022-05-04 06:49:51 -07:00 committed by GitHub
commit 39021f66ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 276 additions and 83 deletions

View File

@ -18,17 +18,25 @@ package framework
// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name.
type NodeInfoLister interface {
// Returns the list of NodeInfos.
// List returns the list of NodeInfos.
List() ([]*NodeInfo, error)
// Returns the list of NodeInfos of nodes with pods with affinity terms.
// HavePodsWithAffinityList returns the list of NodeInfos of nodes with pods with affinity terms.
HavePodsWithAffinityList() ([]*NodeInfo, error)
// Returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
// HavePodsWithRequiredAntiAffinityList returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error)
// Returns the NodeInfo of the given node name.
// Get returns the NodeInfo of the given node name.
Get(nodeName string) (*NodeInfo, error)
}
// StorageInfoLister interface represents anything that handles storage-related operations and resources.
type StorageInfoLister interface {
// IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods,
// keyed in the format "namespace/name".
IsPVCUsedByPods(key string) bool
}
// SharedLister groups scheduler-specific listers.
type SharedLister interface {
NodeInfos() NodeInfoLister
StorageInfos() StorageInfoLister
}

View File

@ -18,7 +18,6 @@ package volumerestrictions
import (
"context"
"sync/atomic"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -27,16 +26,14 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
)
// VolumeRestrictions is a plugin that checks volume restrictions.
type VolumeRestrictions struct {
parallelizer parallelize.Parallelizer
pvcLister corelisters.PersistentVolumeClaimLister
nodeInfoLister framework.SharedLister
sharedLister framework.SharedLister
enableReadWriteOncePod bool
}
@ -132,12 +129,6 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo
// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable.
// TODO(#103132): Mark pod as Unschedulable and add preemption logic.
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.Context, pod *v1.Pod) *framework.Status {
nodeInfos, err := pl.nodeInfoLister.NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, "error while getting node info")
}
var pvcKeys []string
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
@ -155,29 +146,11 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.C
continue
}
key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName
pvcKeys = append(pvcKeys, key)
}
subCtx, cancel := context.WithCancel(ctx)
var conflicts uint32
processNode := func(i int) {
nodeInfo := nodeInfos[i]
for _, key := range pvcKeys {
refCount := nodeInfo.PVCRefCounts[key]
if refCount > 0 {
atomic.AddUint32(&conflicts, 1)
cancel()
}
}
}
pl.parallelizer.Until(subCtx, len(nodeInfos), processNode)
// Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet.
if conflicts > 0 {
key := framework.GetNamespacedName(pod.Namespace, volume.PersistentVolumeClaim.ClaimName)
if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict)
}
}
return nil
}
@ -232,12 +205,11 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent {
func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
nodeInfoLister := handle.SnapshotSharedLister()
sharedLister := handle.SnapshotSharedLister()
return &VolumeRestrictions{
parallelizer: handle.Parallelizer(),
pvcLister: pvcLister,
nodeInfoLister: nodeInfoLister,
sharedLister: sharedLister,
enableReadWriteOncePod: fts.EnableReadWriteOncePod,
}, nil
}

View File

@ -549,7 +549,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
Allocatable: n.Allocatable.Clone(),
UsedPorts: make(HostPortInfo),
ImageStates: n.ImageStates,
PVCRefCounts: n.PVCRefCounts,
PVCRefCounts: make(map[string]int),
Generation: n.Generation,
}
if len(n.Pods) > 0 {
@ -571,6 +571,9 @@ func (n *NodeInfo) Clone() *NodeInfo {
if len(n.PodsWithRequiredAntiAffinity) > 0 {
clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
}
for key, value := range n.PVCRefCounts {
clone.PVCRefCounts[key] = value
}
return clone
}
@ -770,7 +773,7 @@ func (n *NodeInfo) updatePVCRefCounts(pod *v1.Pod, add bool) {
continue
}
key := pod.Namespace + "/" + v.PersistentVolumeClaim.ClaimName
key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
if add {
n.PVCRefCounts[key] += 1
} else {
@ -838,6 +841,11 @@ func GetPodKey(pod *v1.Pod) (string, error) {
return uid, nil
}
// GetNamespacedName returns the string format of a namespaced resource name.
func GetNamespacedName(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
// DefaultBindAllHostIP defines the default ip address used to bind to all host.
const DefaultBindAllHostIP = "0.0.0.0"

View File

@ -191,7 +191,7 @@ func (cache *cacheImpl) Dump() *Dump {
// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// nodeInfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
@ -212,6 +212,9 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
// status from having pods with required anti-affinity to NOT having pods with required
// anti-affinity or the other way around.
updateNodesHavePodsWithRequiredAntiAffinity := false
// usedPVCSet must be re-created whenever the head node generation is greater than
// last snapshot generation.
updateUsedPVCSet := false
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
@ -237,6 +240,18 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
if !updateUsedPVCSet {
if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) {
updateUsedPVCSet = true
} else {
for pvcKey := range clone.PVCRefCounts {
if _, found := existing.PVCRefCounts[pvcKey]; !found {
updateUsedPVCSet = true
break
}
}
}
}
// We need to preserve the original pointer of the NodeInfo struct since it
// is used in the NodeInfoList, which we may not update.
*existing = *clone
@ -255,7 +270,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
updateAllLists = true
}
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet {
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}
@ -278,6 +293,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.usedPVCSet = sets.NewString()
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
@ -294,6 +310,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
} else {
klog.ErrorS(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName))
}
@ -306,6 +325,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
}
}
}
@ -366,7 +388,7 @@ func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
// finishBinding exists to make tests deterministic by injecting now as an argument
func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := framework.GetPodKey(pod)
if err != nil {

View File

@ -24,10 +24,14 @@ import (
"testing"
"time"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -1169,7 +1173,7 @@ func TestNodeOperators(t *testing.T) {
func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
// Create a few nodes to be used in tests.
nodes := []*v1.Node{}
var nodes []*v1.Node
for i := 0; i < 10; i++ {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -1185,7 +1189,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
nodes = append(nodes, node)
}
// Create a few nodes as updated versions of the above nodes
updatedNodes := []*v1.Node{}
var updatedNodes []*v1.Node
for _, n := range nodes {
updatedNode := n.DeepCopy()
updatedNode.Status.Allocatable = v1.ResourceList{
@ -1196,7 +1200,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
// Create a few pods for tests.
pods := []*v1.Pod{}
var pods []*v1.Pod
for i := 0; i < 20; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1212,7 +1216,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
// Create a few pods as updated versions of the above pods.
updatedPods := []*v1.Pod{}
var updatedPods []*v1.Pod
for _, p := range pods {
updatedPod := p.DeepCopy()
priority := int32(1000)
@ -1221,24 +1225,21 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
// Add a couple of pods with affinity, on the first and seconds nodes.
podsWithAffinity := []*v1.Pod{}
var podsWithAffinity []*v1.Pod
for i := 0; i < 2; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod%v", i),
Namespace: "test-ns",
UID: types.UID(fmt.Sprintf("test-puid%v", i)),
},
Spec: v1.PodSpec{
NodeName: fmt.Sprintf("test-node%v", i),
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{},
},
},
}
pod := st.MakePod().Name(fmt.Sprintf("p-affinity-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-affinity-%v", i)).
PodAffinityExists("foo", "", st.PodAffinityWithRequiredReq).Node(fmt.Sprintf("test-node%v", i)).Obj()
podsWithAffinity = append(podsWithAffinity, pod)
}
// Add a few of pods with PVC
var podsWithPVC []*v1.Pod
for i := 0; i < 8; i++ {
pod := st.MakePod().Name(fmt.Sprintf("p-pvc-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-pvc-%v", i)).
PVC(fmt.Sprintf("test-pvc%v", i%4)).Node(fmt.Sprintf("test-node%v", i%2)).Obj()
podsWithPVC = append(podsWithPVC, pod)
}
var cache *cacheImpl
var snapshot *Snapshot
type operation = func(t *testing.T)
@ -1274,6 +1275,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
}
}
addPodWithPVC := func(i int) operation {
return func(t *testing.T) {
if err := cache.AddPod(podsWithPVC[i]); err != nil {
t.Error(err)
}
}
}
removePod := func(i int) operation {
return func(t *testing.T) {
if err := cache.RemovePod(pods[i]); err != nil {
@ -1288,6 +1296,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
}
}
removePodWithPVC := func(i int) operation {
return func(t *testing.T) {
if err := cache.RemovePod(podsWithPVC[i]); err != nil {
t.Error(err)
}
}
}
updatePod := func(i int) operation {
return func(t *testing.T) {
if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil {
@ -1309,16 +1324,19 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
operations []operation
expected []*v1.Node
expectedHavePodsWithAffinity int
expectedUsedPVCSet sets.String
}{
{
name: "Empty cache",
operations: []operation{},
expected: []*v1.Node{},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Single node",
operations: []operation{addNode(1)},
expected: []*v1.Node{nodes[1]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add node, remove it, add it again",
@ -1326,6 +1344,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(1), updateSnapshot(), removeNode(1), addNode(1),
},
expected: []*v1.Node{nodes[1]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add node and remove it in the same cycle, add it again",
@ -1333,6 +1352,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(1), updateSnapshot(), addNode(2), removeNode(1),
},
expected: []*v1.Node{nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and snapshot in the middle",
@ -1341,6 +1361,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
updateSnapshot(), addNode(3),
},
expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and snapshot in the end",
@ -1348,6 +1369,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), addNode(2), addNode(5), addNode(6),
},
expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Update some nodes",
@ -1355,6 +1377,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1),
},
expected: []*v1.Node{nodes[1], nodes[5], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and remove all of them",
@ -1363,6 +1386,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
removeNode(0), removeNode(2), removeNode(5), removeNode(6),
},
expected: []*v1.Node{},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and remove some of them",
@ -1371,6 +1395,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
removeNode(0), removeNode(6),
},
expected: []*v1.Node{nodes[5], nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, remove all of them, and add more",
@ -1380,6 +1405,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(7), addNode(9),
},
expected: []*v1.Node{nodes[9], nodes[7]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Update nodes in particular order",
@ -1388,6 +1414,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[8], nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add some nodes and some pods",
@ -1396,6 +1423,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addPod(8), addPod(2),
},
expected: []*v1.Node{nodes[2], nodes[8], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Updating a pod moves its node to the head",
@ -1403,6 +1431,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0),
},
expected: []*v1.Node{nodes[0], nodes[4], nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add pod before its node",
@ -1410,6 +1439,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), addPod(1), updatePod(1), addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Remove node before its pods",
@ -1419,6 +1449,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
updatePod(1), updatePod(11), removePod(1), removePod(11),
},
expected: []*v1.Node{nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add Pods with affinity",
@ -1427,6 +1458,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedHavePodsWithAffinity: 1,
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add Pods with PVC",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"),
},
{
name: "Add multiple nodes with pods with affinity",
@ -1435,6 +1475,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedHavePodsWithAffinity: 2,
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add multiple nodes with pods with PVC",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1), addPodWithPVC(1), updateSnapshot(),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0", "test-ns/test-pvc1"),
},
{
name: "Add then Remove pods with affinity",
@ -1443,6 +1492,43 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
},
expected: []*v1.Node{nodes[0], nodes[1]},
expectedHavePodsWithAffinity: 0,
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add then Remove pod with PVC",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(2), updateSnapshot(),
},
expected: []*v1.Node{nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2"),
},
{
name: "Add then Remove pod with PVC and add same pod again",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(0), updateSnapshot(),
},
expected: []*v1.Node{nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"),
},
{
name: "Add and Remove multiple pods with PVC with same ref count length different content",
operations: []operation{
addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), updateSnapshot(),
removePodWithPVC(0), removePodWithPVC(1), addPodWithPVC(2), addPodWithPVC(3), updateSnapshot(),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2", "test-ns/test-pvc3"),
},
{
name: "Add and Remove multiple pods with PVC",
operations: []operation{
addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), addPodWithPVC(2), updateSnapshot(),
removePodWithPVC(0), removePodWithPVC(1), updateSnapshot(), addPodWithPVC(0), updateSnapshot(),
addPodWithPVC(3), addPodWithPVC(4), addPodWithPVC(5), updateSnapshot(),
removePodWithPVC(0), removePodWithPVC(3), removePodWithPVC(4), updateSnapshot(),
},
expected: []*v1.Node{nodes[0], nodes[1]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc1", "test-ns/test-pvc2"),
},
}
@ -1476,6 +1562,11 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList))
}
// Compare content of the used PVC set
if diff := cmp.Diff(test.expectedUsedPVCSet, snapshot.usedPVCSet); diff != "" {
t.Errorf("Unexpected usedPVCSet (-want +got):\n%s", diff)
}
// Always update the snapshot at the end of operations and compare it.
if err := cache.UpdateSnapshot(snapshot); err != nil {
t.Error(err)
@ -1509,6 +1600,7 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *
expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
expectedHavePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
expectedUsedPVCSet := sets.NewString()
nodesList, err := cache.nodeTree.list()
if err != nil {
t.Fatal(err)
@ -1519,6 +1611,9 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *
if len(n.PodsWithAffinity) > 0 {
expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n)
}
for key := range n.PVCRefCounts {
expectedUsedPVCSet.Insert(key)
}
} else {
return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName)
}
@ -1538,12 +1633,18 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *
}
}
for key := range expectedUsedPVCSet {
if !snapshot.usedPVCSet.Has(key) {
return fmt.Errorf("expected PVC %s to exist in UsedPVCSet but it is not found", key)
}
}
return nil
}
func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
// Create a few nodes to be used in tests.
nodes := []*v1.Node{}
var nodes []*v1.Node
i := 0
// List of number of nodes per zone, zone 0 -> 2, zone 1 -> 6
for zone, nb := range []int{2, 6} {

View File

@ -36,6 +36,9 @@ type Snapshot struct {
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
// required anti-affinity terms.
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
// usedPVCSet contains a set of PVC names that have one or more scheduled pods using them,
// keyed in the format "namespace/name".
usedPVCSet sets.String
generation int64
}
@ -45,6 +48,7 @@ var _ framework.SharedLister = &Snapshot{}
func NewEmptySnapshot() *Snapshot {
return &Snapshot{
nodeInfoMap: make(map[string]*framework.NodeInfo),
usedPVCSet: sets.NewString(),
}
}
@ -69,6 +73,7 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
s.nodeInfoList = nodeInfoList
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList
s.usedPVCSet = createUsedPVCSet(pods)
return s
}
@ -98,6 +103,25 @@ func createNodeInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*framework.N
return nodeNameToInfo
}
func createUsedPVCSet(pods []*v1.Pod) sets.String {
usedPVCSet := sets.NewString()
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
for _, v := range pod.Spec.Volumes {
if v.PersistentVolumeClaim == nil {
continue
}
key := framework.GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
usedPVCSet.Insert(key)
}
}
return usedPVCSet
}
// getNodeImageStates returns the given node's image states based on the given imageExistence map.
func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*framework.ImageStateSummary {
imageStates := make(map[string]*framework.ImageStateSummary)
@ -135,6 +159,11 @@ func (s *Snapshot) NodeInfos() framework.NodeInfoLister {
return s
}
// StorageInfos returns a StorageInfoLister.
func (s *Snapshot) StorageInfos() framework.StorageInfoLister {
return s
}
// NumNodes returns the number of nodes in the snapshot.
func (s *Snapshot) NumNodes() int {
return len(s.nodeInfoList)
@ -163,3 +192,7 @@ func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
}
return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)
}
func (s *Snapshot) IsPVCUsedByPods(key string) bool {
return s.usedPVCSet.Has(key)
}

View File

@ -21,10 +21,12 @@ import (
"reflect"
"testing"
"k8s.io/api/core/v1"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
const mb int64 = 1024 * 1024
@ -180,3 +182,50 @@ func TestCreateImageExistenceMap(t *testing.T) {
})
}
}
func TestCreateUsedPVCSet(t *testing.T) {
tests := []struct {
name string
pods []*v1.Pod
expected sets.String
}{
{
name: "empty pods list",
pods: []*v1.Pod{},
expected: sets.NewString(),
},
{
name: "pods not scheduled",
pods: []*v1.Pod{
st.MakePod().Name("foo").Namespace("foo").Obj(),
st.MakePod().Name("bar").Namespace("bar").Obj(),
},
expected: sets.NewString(),
},
{
name: "scheduled pods that do not use any PVC",
pods: []*v1.Pod{
st.MakePod().Name("foo").Namespace("foo").Node("node-1").Obj(),
st.MakePod().Name("bar").Namespace("bar").Node("node-2").Obj(),
},
expected: sets.NewString(),
},
{
name: "scheduled pods that use PVC",
pods: []*v1.Pod{
st.MakePod().Name("foo").Namespace("foo").Node("node-1").PVC("pvc1").Obj(),
st.MakePod().Name("bar").Namespace("bar").Node("node-2").PVC("pvc2").Obj(),
},
expected: sets.NewString("foo/pvc1", "bar/pvc2"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
usedPVCs := createUsedPVCSet(test.pods)
if diff := cmp.Diff(test.expected, usedPVCs); diff != "" {
t.Errorf("Unexpected usedPVCs (-want +got):\n%s", diff)
}
})
}
}