diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 15461c5e952..3ab343f487f 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -335,6 +335,14 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) } } +// NodeCount returns the number of nodes in the cache. +// DO NOT use outside of tests. +func (cache *schedulerCache) NodeCount() int { + cache.mu.RLock() + defer cache.mu.RUnlock() + return len(cache.nodes) +} + // PodCount returns the number of pods in the cache (including those from deleted nodes). // DO NOT use outside of tests. func (cache *schedulerCache) PodCount() (int, error) { @@ -343,10 +351,6 @@ func (cache *schedulerCache) PodCount() (int, error) { // podFilter is expected to return true for most or all of the pods. We // can avoid expensive array growth without wasting too much memory by // pre-allocating capacity. - maxSize := 0 - for _, n := range cache.nodes { - maxSize += len(n.info.Pods) - } count := 0 for _, n := range cache.nodes { count += len(n.info.Pods) diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index b0b5013ab16..0b1d0714045 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -77,6 +77,9 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error { return nil } +// NodeCount is a fake method for testing. +func (c *Cache) NodeCount() int { return 0 } + // PodCount is a fake method for testing. func (c *Cache) PodCount() (int, error) { return 0, nil } diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index 06dd2b2451b..9c555c31b56 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -56,7 +56,12 @@ import ( // - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue, // a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache. type Cache interface { + // NodeCount returns the number of nodes in the cache. + // DO NOT use outside of tests. + NodeCount() int + // PodCount returns the number of pods in the cache (including those from deleted nodes). + // DO NOT use outside of tests. PodCount() (int, error) // AssumePod assumes a pod scheduled and aggregates the pod's information into its node. diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index f39600e5607..f2fbe2bdd36 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -381,7 +381,7 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) for ii := 0; ii < 5; ii++ { node.Name = fmt.Sprintf("machine%d", ii+1) - if _, err := cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { + if _, err := createNode(cs, node); err != nil { t.Fatalf("Failed to create nodes: %v", err) } } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index b1af15974d5..910e88f36e1 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -1181,9 +1181,9 @@ func TestBindPlugin(t *testing.T) { defer testutils.CleanupTest(t, testCtx) // Add a few nodes. - _, err := createNodes(testCtx.ClientSet, "test-node", st.MakeNode(), 2) + _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), 2) if err != nil { - t.Fatalf("Cannot create nodes: %v", err) + t.Fatal(err) } tests := []struct { @@ -1840,9 +1840,9 @@ func TestPreemptWithPermitPlugin(t *testing.T) { v1.ResourceCPU: "500m", v1.ResourceMemory: "500", } - _, err := createNodes(testCtx.ClientSet, "test-node", st.MakeNode().Capacity(nodeRes), 1) + _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode().Capacity(nodeRes), 1) if err != nil { - t.Fatalf("Cannot create nodes: %v", err) + t.Fatal(err) } permitPlugin.failPermit = false @@ -1900,9 +1900,8 @@ func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestCont go testCtx.Scheduler.Run(testCtx.Ctx) if nodeCount > 0 { - _, err := createNodes(testCtx.ClientSet, "test-node", st.MakeNode(), nodeCount) - if err != nil { - t.Fatalf("Cannot create nodes: %v", err) + if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { + t.Fatal(err) } } return testCtx diff --git a/test/integration/scheduler/predicates_test.go b/test/integration/scheduler/predicates_test.go index fb8f494b962..54365fa77ce 100644 --- a/test/integration/scheduler/predicates_test.go +++ b/test/integration/scheduler/predicates_test.go @@ -43,9 +43,9 @@ func TestInterPodAffinity(t *testing.T) { defer testutils.CleanupTest(t, testCtx) // Add a few nodes with labels - nodes, err := createNodes(testCtx.ClientSet, "testnode", st.MakeNode().Label("region", "r1").Label("zone", "z11"), 2) + nodes, err := createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode().Label("region", "r1").Label("zone", "z11"), 2) if err != nil { - t.Fatalf("Cannot create nodes: %v", err) + t.Fatal(err) } cs := testCtx.ClientSet diff --git a/test/integration/scheduler/priorities_test.go b/test/integration/scheduler/priorities_test.go index de16a480413..d6475da88f3 100644 --- a/test/integration/scheduler/priorities_test.go +++ b/test/integration/scheduler/priorities_test.go @@ -39,7 +39,6 @@ import ( ) // This file tests the scheduler priority functions. - func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *testutils.TestContext { prof := schedulerconfig.KubeSchedulerProfile{ SchedulerName: v1.DefaultSchedulerName, @@ -72,9 +71,9 @@ func TestNodeAffinity(t *testing.T) { testCtx := initTestSchedulerForPriorityTest(t, nodeaffinity.Name) defer testutils.CleanupTest(t, testCtx) // Add a few nodes. - _, err := createNodes(testCtx.ClientSet, "testnode", st.MakeNode(), 4) + _, err := createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode(), 4) if err != nil { - t.Fatalf("Cannot create nodes: %v", err) + t.Fatal(err) } // Add a label to one of the nodes. labelKey := "kubernetes.io/node-topologyKey" @@ -126,9 +125,9 @@ func TestPodAffinity(t *testing.T) { // Add a few nodes. topologyKey := "node-topologykey" topologyValue := "topologyvalue" - nodesInTopology, err := createNodes(testCtx.ClientSet, "in-topology", st.MakeNode().Label(topologyKey, topologyValue), 5) + nodesInTopology, err := createAndWaitForNodesInCache(testCtx, "in-topology", st.MakeNode().Label(topologyKey, topologyValue), 5) if err != nil { - t.Fatalf("Cannot create nodes: %v", err) + t.Fatal(err) } // Add a pod with a label and wait for it to schedule. labelKey := "service" @@ -142,9 +141,9 @@ func TestPodAffinity(t *testing.T) { t.Fatalf("Error running the attractor pod: %v", err) } // Add a few more nodes without the topology label. - _, err = createNodes(testCtx.ClientSet, "other-node", st.MakeNode(), 5) + _, err = createAndWaitForNodesInCache(testCtx, "other-node", st.MakeNode(), 5) if err != nil { - t.Fatalf("Cannot create the second set of nodes: %v", err) + t.Fatal(err) } // Add a new pod with affinity to the attractor pod. podName := "pod-with-podaffinity" @@ -215,9 +214,9 @@ func TestImageLocality(t *testing.T) { } // Add a few nodes. - _, err = createNodes(testCtx.ClientSet, "testnode", st.MakeNode(), 10) + _, err = createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode(), 10) if err != nil { - t.Fatalf("cannot create nodes: %v", err) + t.Fatal(err) } // Create a pod with containers each having the specified image. diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index dccbe40244c..5d10ef3c51d 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -427,7 +427,7 @@ func TestUnschedulableNodes(t *testing.T) { } for i, mod := range nodeModifications { - unSchedNode, err := testCtx.ClientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + unSchedNode, err := createNode(testCtx.ClientSet, node) if err != nil { t.Fatalf("Failed to create node: %v", err) } @@ -510,7 +510,7 @@ func TestMultipleSchedulers(t *testing.T) { }, }, } - testCtx.ClientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + createNode(testCtx.ClientSet, node) // 3. create 3 pods for testing t.Logf("create 3 pods for testing") @@ -635,7 +635,7 @@ func TestMultipleSchedulingProfiles(t *testing.T) { }, }, } - if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil { + if _, err := createNode(testCtx.ClientSet, node); err != nil { t.Fatal(err) } @@ -825,6 +825,10 @@ func TestSchedulerInformers(t *testing.T) { t.Fatalf("Error creating node %v: %v", nodeConf.name, err) } } + // Ensure nodes are present in scheduler cache. + if err := waitForNodesInCache(testCtx.Scheduler, len(test.nodes)); err != nil { + t.Fatal(err) + } pods := make([]*v1.Pod, len(test.existingPods)) var err error diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 633aa7222ef..2251123fc49 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -111,7 +111,7 @@ func initTestDisablePreemption(t *testing.T, nsPrefix string) *testutils.TestCon // to see is in the store. Used to observe reflected events. func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string, passFunc func(n interface{}) bool) error { - nodes := []*v1.Node{} + var nodes []*v1.Node err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { n, err := nodeLister.Get(key) @@ -143,6 +143,9 @@ func createNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) { // createNodes creates `numNodes` nodes. The created node names will be in the // form of "`prefix`-X" where X is an ordinal. +// DEPRECATED +// use createAndWaitForNodesInCache instead, which ensures the created nodes +// to be present in scheduler cache. func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { nodes := make([]*v1.Node, numNodes) for i := 0; i < numNodes; i++ { @@ -156,6 +159,29 @@ func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, return nodes[:], nil } +// createAndWaitForNodesInCache calls createNodes(), and wait for the created +// nodes to be present in scheduler cache. +func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) { + existingNodes := testCtx.Scheduler.SchedulerCache.NodeCount() + nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes) + if err != nil { + return nodes, fmt.Errorf("cannot create nodes: %v", err) + } + return nodes, waitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes) +} + +// waitForNodesInCache ensures at least nodes are present in scheduler cache +// within 30 seconds; otherwise returns false. +func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error { + err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return sched.SchedulerCache.NodeCount() >= nodeCount, nil + }) + if err != nil { + return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err) + } + return nil +} + type pausePodConfig struct { Name string Namespace string