Adding StorageInfoLister to SharedLister

This change creates a StorageInfoLister interface
and have it under scheduler SharedLister.

The initial StorageInfoLister interface has a
IsPVCUsedByPods which returns true/false depending on
whether the PVC (keyed by namespace/name) has at least
one scheduled pod using it.

In snapshot real implementation, add a usedPVCSet
key by PVC namespace/name which contains all PVCs
that have at least one scheduled pod using it.
During snapshot update, populate this set based on
whether the PVCRefCounts map for node(s) have been
updated since last snapshot.

Signed-off-by: Yibo Zhuang <yibzhuang@gmail.com>
This commit is contained in:
Yibo Zhuang 2022-05-03 18:00:41 -07:00
parent 537941765f
commit 32c18a3479
6 changed files with 270 additions and 49 deletions

View File

@ -18,17 +18,25 @@ package framework
// NodeInfoLister interface represents anything that can list/get NodeInfo objects from node name.
type NodeInfoLister interface {
// Returns the list of NodeInfos.
// List returns the list of NodeInfos.
List() ([]*NodeInfo, error)
// Returns the list of NodeInfos of nodes with pods with affinity terms.
// HavePodsWithAffinityList returns the list of NodeInfos of nodes with pods with affinity terms.
HavePodsWithAffinityList() ([]*NodeInfo, error)
// Returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
// HavePodsWithRequiredAntiAffinityList returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
HavePodsWithRequiredAntiAffinityList() ([]*NodeInfo, error)
// Returns the NodeInfo of the given node name.
// Get returns the NodeInfo of the given node name.
Get(nodeName string) (*NodeInfo, error)
}
// StorageInfoLister interface represents anything that handles storage-related operations and resources.
type StorageInfoLister interface {
// IsPVCUsedByPods returns true/false on whether the PVC is used by one or more scheduled pods,
// keyed in the format "namespace/name".
IsPVCUsedByPods(key string) bool
}
// SharedLister groups scheduler-specific listers.
type SharedLister interface {
NodeInfos() NodeInfoLister
StorageInfos() StorageInfoLister
}

View File

@ -549,7 +549,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
Allocatable: n.Allocatable.Clone(),
UsedPorts: make(HostPortInfo),
ImageStates: n.ImageStates,
PVCRefCounts: n.PVCRefCounts,
PVCRefCounts: make(map[string]int),
Generation: n.Generation,
}
if len(n.Pods) > 0 {
@ -571,6 +571,9 @@ func (n *NodeInfo) Clone() *NodeInfo {
if len(n.PodsWithRequiredAntiAffinity) > 0 {
clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
}
for key, value := range n.PVCRefCounts {
clone.PVCRefCounts[key] = value
}
return clone
}
@ -770,7 +773,7 @@ func (n *NodeInfo) updatePVCRefCounts(pod *v1.Pod, add bool) {
continue
}
key := pod.Namespace + "/" + v.PersistentVolumeClaim.ClaimName
key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
if add {
n.PVCRefCounts[key] += 1
} else {
@ -838,6 +841,11 @@ func GetPodKey(pod *v1.Pod) (string, error) {
return uid, nil
}
// GetNamespacedName returns the string format of a namespaced resource name.
func GetNamespacedName(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}
// DefaultBindAllHostIP defines the default ip address used to bind to all host.
const DefaultBindAllHostIP = "0.0.0.0"

View File

@ -191,7 +191,7 @@ func (cache *cacheImpl) Dump() *Dump {
// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle.
// The snapshot only includes Nodes that are not deleted at the time this function is called.
// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// nodeInfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
@ -212,6 +212,9 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
// status from having pods with required anti-affinity to NOT having pods with required
// anti-affinity or the other way around.
updateNodesHavePodsWithRequiredAntiAffinity := false
// usedPVCSet must be re-created whenever the head node generation is greater than
// last snapshot generation.
updateUsedPVCSet := false
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
@ -237,6 +240,18 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
if !updateUsedPVCSet {
if len(existing.PVCRefCounts) != len(clone.PVCRefCounts) {
updateUsedPVCSet = true
} else {
for pvcKey := range clone.PVCRefCounts {
if _, found := existing.PVCRefCounts[pvcKey]; !found {
updateUsedPVCSet = true
break
}
}
}
}
// We need to preserve the original pointer of the NodeInfo struct since it
// is used in the NodeInfoList, which we may not update.
*existing = *clone
@ -255,7 +270,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
updateAllLists = true
}
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity || updateUsedPVCSet {
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}
@ -278,6 +293,7 @@ func (cache *cacheImpl) UpdateSnapshot(nodeSnapshot *Snapshot) error {
func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.usedPVCSet = sets.NewString()
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
@ -294,6 +310,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
} else {
klog.ErrorS(nil, "Node exists in nodeTree but not in NodeInfoMap, this should not happen", "node", klog.KRef("", nodeName))
}
@ -306,6 +325,9 @@ func (cache *cacheImpl) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
for key := range nodeInfo.PVCRefCounts {
snapshot.usedPVCSet.Insert(key)
}
}
}
}
@ -366,7 +388,7 @@ func (cache *cacheImpl) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
// finishBinding exists to make tests deterministic by injecting now as an argument
func (cache *cacheImpl) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := framework.GetPodKey(pod)
if err != nil {

View File

@ -24,10 +24,14 @@ import (
"testing"
"time"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
@ -1169,7 +1173,7 @@ func TestNodeOperators(t *testing.T) {
func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
// Create a few nodes to be used in tests.
nodes := []*v1.Node{}
var nodes []*v1.Node
for i := 0; i < 10; i++ {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
@ -1185,7 +1189,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
nodes = append(nodes, node)
}
// Create a few nodes as updated versions of the above nodes
updatedNodes := []*v1.Node{}
var updatedNodes []*v1.Node
for _, n := range nodes {
updatedNode := n.DeepCopy()
updatedNode.Status.Allocatable = v1.ResourceList{
@ -1196,7 +1200,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
// Create a few pods for tests.
pods := []*v1.Pod{}
var pods []*v1.Pod
for i := 0; i < 20; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -1212,7 +1216,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
// Create a few pods as updated versions of the above pods.
updatedPods := []*v1.Pod{}
var updatedPods []*v1.Pod
for _, p := range pods {
updatedPod := p.DeepCopy()
priority := int32(1000)
@ -1221,24 +1225,21 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
// Add a couple of pods with affinity, on the first and seconds nodes.
podsWithAffinity := []*v1.Pod{}
var podsWithAffinity []*v1.Pod
for i := 0; i < 2; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod%v", i),
Namespace: "test-ns",
UID: types.UID(fmt.Sprintf("test-puid%v", i)),
},
Spec: v1.PodSpec{
NodeName: fmt.Sprintf("test-node%v", i),
Affinity: &v1.Affinity{
PodAffinity: &v1.PodAffinity{},
},
},
}
pod := st.MakePod().Name(fmt.Sprintf("p-affinity-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-affinity-%v", i)).
PodAffinityExists("foo", "", st.PodAffinityWithRequiredReq).Node(fmt.Sprintf("test-node%v", i)).Obj()
podsWithAffinity = append(podsWithAffinity, pod)
}
// Add a few of pods with PVC
var podsWithPVC []*v1.Pod
for i := 0; i < 8; i++ {
pod := st.MakePod().Name(fmt.Sprintf("p-pvc-%v", i)).Namespace("test-ns").UID(fmt.Sprintf("puid-pvc-%v", i)).
PVC(fmt.Sprintf("test-pvc%v", i%4)).Node(fmt.Sprintf("test-node%v", i%2)).Obj()
podsWithPVC = append(podsWithPVC, pod)
}
var cache *cacheImpl
var snapshot *Snapshot
type operation = func(t *testing.T)
@ -1274,6 +1275,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
}
}
addPodWithPVC := func(i int) operation {
return func(t *testing.T) {
if err := cache.AddPod(podsWithPVC[i]); err != nil {
t.Error(err)
}
}
}
removePod := func(i int) operation {
return func(t *testing.T) {
if err := cache.RemovePod(pods[i]); err != nil {
@ -1288,6 +1296,13 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
}
}
}
removePodWithPVC := func(i int) operation {
return func(t *testing.T) {
if err := cache.RemovePod(podsWithPVC[i]); err != nil {
t.Error(err)
}
}
}
updatePod := func(i int) operation {
return func(t *testing.T) {
if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil {
@ -1309,30 +1324,35 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
operations []operation
expected []*v1.Node
expectedHavePodsWithAffinity int
expectedUsedPVCSet sets.String
}{
{
name: "Empty cache",
operations: []operation{},
expected: []*v1.Node{},
name: "Empty cache",
operations: []operation{},
expected: []*v1.Node{},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Single node",
operations: []operation{addNode(1)},
expected: []*v1.Node{nodes[1]},
name: "Single node",
operations: []operation{addNode(1)},
expected: []*v1.Node{nodes[1]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add node, remove it, add it again",
operations: []operation{
addNode(1), updateSnapshot(), removeNode(1), addNode(1),
},
expected: []*v1.Node{nodes[1]},
expected: []*v1.Node{nodes[1]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add node and remove it in the same cycle, add it again",
operations: []operation{
addNode(1), updateSnapshot(), addNode(2), removeNode(1),
},
expected: []*v1.Node{nodes[2]},
expected: []*v1.Node{nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and snapshot in the middle",
@ -1340,21 +1360,24 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2),
updateSnapshot(), addNode(3),
},
expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]},
expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and snapshot in the end",
operations: []operation{
addNode(0), addNode(2), addNode(5), addNode(6),
},
expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]},
expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Update some nodes",
operations: []operation{
addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1),
},
expected: []*v1.Node{nodes[1], nodes[5], nodes[0]},
expected: []*v1.Node{nodes[1], nodes[5], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and remove all of them",
@ -1362,7 +1385,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
removeNode(0), removeNode(2), removeNode(5), removeNode(6),
},
expected: []*v1.Node{},
expected: []*v1.Node{},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, and remove some of them",
@ -1370,7 +1394,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
removeNode(0), removeNode(6),
},
expected: []*v1.Node{nodes[5], nodes[2]},
expected: []*v1.Node{nodes[5], nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add a few nodes, remove all of them, and add more",
@ -1379,7 +1404,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
removeNode(2), removeNode(5), removeNode(6), updateSnapshot(),
addNode(7), addNode(9),
},
expected: []*v1.Node{nodes[9], nodes[7]},
expected: []*v1.Node{nodes[9], nodes[7]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Update nodes in particular order",
@ -1387,7 +1413,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(8), updateNode(2), updateNode(8), updateSnapshot(),
addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[8], nodes[2]},
expected: []*v1.Node{nodes[1], nodes[8], nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add some nodes and some pods",
@ -1395,21 +1422,24 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
addNode(0), addNode(2), addNode(8), updateSnapshot(),
addPod(8), addPod(2),
},
expected: []*v1.Node{nodes[2], nodes[8], nodes[0]},
expected: []*v1.Node{nodes[2], nodes[8], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Updating a pod moves its node to the head",
operations: []operation{
addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0),
},
expected: []*v1.Node{nodes[0], nodes[4], nodes[2]},
expected: []*v1.Node{nodes[0], nodes[4], nodes[2]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add pod before its node",
operations: []operation{
addNode(0), addPod(1), updatePod(1), addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Remove node before its pods",
@ -1418,7 +1448,8 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
removeNode(1), updateSnapshot(),
updatePod(1), updatePod(11), removePod(1), removePod(11),
},
expected: []*v1.Node{nodes[0]},
expected: []*v1.Node{nodes[0]},
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add Pods with affinity",
@ -1427,6 +1458,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedHavePodsWithAffinity: 1,
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add Pods with PVC",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"),
},
{
name: "Add multiple nodes with pods with affinity",
@ -1435,6 +1475,15 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedHavePodsWithAffinity: 2,
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add multiple nodes with pods with PVC",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), addNode(1), addPodWithPVC(1), updateSnapshot(),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0", "test-ns/test-pvc1"),
},
{
name: "Add then Remove pods with affinity",
@ -1443,6 +1492,43 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
},
expected: []*v1.Node{nodes[0], nodes[1]},
expectedHavePodsWithAffinity: 0,
expectedUsedPVCSet: sets.NewString(),
},
{
name: "Add then Remove pod with PVC",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(2), updateSnapshot(),
},
expected: []*v1.Node{nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2"),
},
{
name: "Add then Remove pod with PVC and add same pod again",
operations: []operation{
addNode(0), addPodWithPVC(0), updateSnapshot(), removePodWithPVC(0), addPodWithPVC(0), updateSnapshot(),
},
expected: []*v1.Node{nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc0"),
},
{
name: "Add and Remove multiple pods with PVC with same ref count length different content",
operations: []operation{
addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), updateSnapshot(),
removePodWithPVC(0), removePodWithPVC(1), addPodWithPVC(2), addPodWithPVC(3), updateSnapshot(),
},
expected: []*v1.Node{nodes[1], nodes[0]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc2", "test-ns/test-pvc3"),
},
{
name: "Add and Remove multiple pods with PVC",
operations: []operation{
addNode(0), addNode(1), addPodWithPVC(0), addPodWithPVC(1), addPodWithPVC(2), updateSnapshot(),
removePodWithPVC(0), removePodWithPVC(1), updateSnapshot(), addPodWithPVC(0), updateSnapshot(),
addPodWithPVC(3), addPodWithPVC(4), addPodWithPVC(5), updateSnapshot(),
removePodWithPVC(0), removePodWithPVC(3), removePodWithPVC(4), updateSnapshot(),
},
expected: []*v1.Node{nodes[0], nodes[1]},
expectedUsedPVCSet: sets.NewString("test-ns/test-pvc1", "test-ns/test-pvc2"),
},
}
@ -1476,6 +1562,11 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList))
}
// Compare content of the used PVC set
if diff := cmp.Diff(test.expectedUsedPVCSet, snapshot.usedPVCSet); diff != "" {
t.Errorf("Unexpected usedPVCSet (-want +got):\n%s", diff)
}
// Always update the snapshot at the end of operations and compare it.
if err := cache.UpdateSnapshot(snapshot); err != nil {
t.Error(err)
@ -1509,6 +1600,7 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *
expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
expectedHavePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
expectedUsedPVCSet := sets.NewString()
nodesList, err := cache.nodeTree.list()
if err != nil {
t.Fatal(err)
@ -1519,6 +1611,9 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *
if len(n.PodsWithAffinity) > 0 {
expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n)
}
for key := range n.PVCRefCounts {
expectedUsedPVCSet.Insert(key)
}
} else {
return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName)
}
@ -1538,12 +1633,18 @@ func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *cacheImpl, snapshot *
}
}
for key := range expectedUsedPVCSet {
if !snapshot.usedPVCSet.Has(key) {
return fmt.Errorf("expected PVC %s to exist in UsedPVCSet but it is not found", key)
}
}
return nil
}
func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {
// Create a few nodes to be used in tests.
nodes := []*v1.Node{}
var nodes []*v1.Node
i := 0
// List of number of nodes per zone, zone 0 -> 2, zone 1 -> 6
for zone, nb := range []int{2, 6} {

View File

@ -36,7 +36,10 @@ type Snapshot struct {
// havePodsWithRequiredAntiAffinityNodeInfoList is the list of nodes with at least one pod declaring
// required anti-affinity terms.
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
generation int64
// usedPVCSet contains a set of PVC names that have one or more scheduled pods using them,
// keyed in the format "namespace/name".
usedPVCSet sets.String
generation int64
}
var _ framework.SharedLister = &Snapshot{}
@ -45,6 +48,7 @@ var _ framework.SharedLister = &Snapshot{}
func NewEmptySnapshot() *Snapshot {
return &Snapshot{
nodeInfoMap: make(map[string]*framework.NodeInfo),
usedPVCSet: sets.NewString(),
}
}
@ -69,6 +73,7 @@ func NewSnapshot(pods []*v1.Pod, nodes []*v1.Node) *Snapshot {
s.nodeInfoList = nodeInfoList
s.havePodsWithAffinityNodeInfoList = havePodsWithAffinityNodeInfoList
s.havePodsWithRequiredAntiAffinityNodeInfoList = havePodsWithRequiredAntiAffinityNodeInfoList
s.usedPVCSet = createUsedPVCSet(pods)
return s
}
@ -98,6 +103,25 @@ func createNodeInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*framework.N
return nodeNameToInfo
}
func createUsedPVCSet(pods []*v1.Pod) sets.String {
usedPVCSet := sets.NewString()
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
for _, v := range pod.Spec.Volumes {
if v.PersistentVolumeClaim == nil {
continue
}
key := framework.GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
usedPVCSet.Insert(key)
}
}
return usedPVCSet
}
// getNodeImageStates returns the given node's image states based on the given imageExistence map.
func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*framework.ImageStateSummary {
imageStates := make(map[string]*framework.ImageStateSummary)
@ -135,6 +159,11 @@ func (s *Snapshot) NodeInfos() framework.NodeInfoLister {
return s
}
// StorageInfos returns a StorageInfoLister.
func (s *Snapshot) StorageInfos() framework.StorageInfoLister {
return s
}
// NumNodes returns the number of nodes in the snapshot.
func (s *Snapshot) NumNodes() int {
return len(s.nodeInfoList)
@ -163,3 +192,7 @@ func (s *Snapshot) Get(nodeName string) (*framework.NodeInfo, error) {
}
return nil, fmt.Errorf("nodeinfo not found for node name %q", nodeName)
}
func (s *Snapshot) IsPVCUsedByPods(key string) bool {
return s.usedPVCSet.Has(key)
}

View File

@ -21,10 +21,12 @@ import (
"reflect"
"testing"
"k8s.io/api/core/v1"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/framework"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
const mb int64 = 1024 * 1024
@ -180,3 +182,50 @@ func TestCreateImageExistenceMap(t *testing.T) {
})
}
}
func TestCreateUsedPVCSet(t *testing.T) {
tests := []struct {
name string
pods []*v1.Pod
expected sets.String
}{
{
name: "empty pods list",
pods: []*v1.Pod{},
expected: sets.NewString(),
},
{
name: "pods not scheduled",
pods: []*v1.Pod{
st.MakePod().Name("foo").Namespace("foo").Obj(),
st.MakePod().Name("bar").Namespace("bar").Obj(),
},
expected: sets.NewString(),
},
{
name: "scheduled pods that do not use any PVC",
pods: []*v1.Pod{
st.MakePod().Name("foo").Namespace("foo").Node("node-1").Obj(),
st.MakePod().Name("bar").Namespace("bar").Node("node-2").Obj(),
},
expected: sets.NewString(),
},
{
name: "scheduled pods that use PVC",
pods: []*v1.Pod{
st.MakePod().Name("foo").Namespace("foo").Node("node-1").PVC("pvc1").Obj(),
st.MakePod().Name("bar").Namespace("bar").Node("node-2").PVC("pvc2").Obj(),
},
expected: sets.NewString("foo/pvc1", "bar/pvc2"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
usedPVCs := createUsedPVCSet(test.pods)
if diff := cmp.Diff(test.expected, usedPVCs); diff != "" {
t.Errorf("Unexpected usedPVCs (-want +got):\n%s", diff)
}
})
}
}