mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #96062 from Huang-Wei/sched-aware-of-nodes
Ensure created nodes are present in scheduler cache before scheduling
This commit is contained in:
commit
43a0007fae
12
pkg/scheduler/internal/cache/cache.go
vendored
12
pkg/scheduler/internal/cache/cache.go
vendored
@ -335,6 +335,14 @@ func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeCount returns the number of nodes in the cache.
|
||||||
|
// DO NOT use outside of tests.
|
||||||
|
func (cache *schedulerCache) NodeCount() int {
|
||||||
|
cache.mu.RLock()
|
||||||
|
defer cache.mu.RUnlock()
|
||||||
|
return len(cache.nodes)
|
||||||
|
}
|
||||||
|
|
||||||
// PodCount returns the number of pods in the cache (including those from deleted nodes).
|
// PodCount returns the number of pods in the cache (including those from deleted nodes).
|
||||||
// DO NOT use outside of tests.
|
// DO NOT use outside of tests.
|
||||||
func (cache *schedulerCache) PodCount() (int, error) {
|
func (cache *schedulerCache) PodCount() (int, error) {
|
||||||
@ -343,10 +351,6 @@ func (cache *schedulerCache) PodCount() (int, error) {
|
|||||||
// podFilter is expected to return true for most or all of the pods. We
|
// podFilter is expected to return true for most or all of the pods. We
|
||||||
// can avoid expensive array growth without wasting too much memory by
|
// can avoid expensive array growth without wasting too much memory by
|
||||||
// pre-allocating capacity.
|
// pre-allocating capacity.
|
||||||
maxSize := 0
|
|
||||||
for _, n := range cache.nodes {
|
|
||||||
maxSize += len(n.info.Pods)
|
|
||||||
}
|
|
||||||
count := 0
|
count := 0
|
||||||
for _, n := range cache.nodes {
|
for _, n := range cache.nodes {
|
||||||
count += len(n.info.Pods)
|
count += len(n.info.Pods)
|
||||||
|
@ -77,6 +77,9 @@ func (c *Cache) UpdateSnapshot(snapshot *internalcache.Snapshot) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeCount is a fake method for testing.
|
||||||
|
func (c *Cache) NodeCount() int { return 0 }
|
||||||
|
|
||||||
// PodCount is a fake method for testing.
|
// PodCount is a fake method for testing.
|
||||||
func (c *Cache) PodCount() (int, error) { return 0, nil }
|
func (c *Cache) PodCount() (int, error) { return 0, nil }
|
||||||
|
|
||||||
|
5
pkg/scheduler/internal/cache/interface.go
vendored
5
pkg/scheduler/internal/cache/interface.go
vendored
@ -56,7 +56,12 @@ import (
|
|||||||
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
|
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
|
||||||
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
|
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
|
||||||
type Cache interface {
|
type Cache interface {
|
||||||
|
// NodeCount returns the number of nodes in the cache.
|
||||||
|
// DO NOT use outside of tests.
|
||||||
|
NodeCount() int
|
||||||
|
|
||||||
// PodCount returns the number of pods in the cache (including those from deleted nodes).
|
// PodCount returns the number of pods in the cache (including those from deleted nodes).
|
||||||
|
// DO NOT use outside of tests.
|
||||||
PodCount() (int, error)
|
PodCount() (int, error)
|
||||||
|
|
||||||
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
|
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
|
||||||
|
@ -381,7 +381,7 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface)
|
|||||||
|
|
||||||
for ii := 0; ii < 5; ii++ {
|
for ii := 0; ii < 5; ii++ {
|
||||||
node.Name = fmt.Sprintf("machine%d", ii+1)
|
node.Name = fmt.Sprintf("machine%d", ii+1)
|
||||||
if _, err := cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
|
if _, err := createNode(cs, node); err != nil {
|
||||||
t.Fatalf("Failed to create nodes: %v", err)
|
t.Fatalf("Failed to create nodes: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1181,9 +1181,9 @@ func TestBindPlugin(t *testing.T) {
|
|||||||
defer testutils.CleanupTest(t, testCtx)
|
defer testutils.CleanupTest(t, testCtx)
|
||||||
|
|
||||||
// Add a few nodes.
|
// Add a few nodes.
|
||||||
_, err := createNodes(testCtx.ClientSet, "test-node", st.MakeNode(), 2)
|
_, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Cannot create nodes: %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -1840,9 +1840,9 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
|
|||||||
v1.ResourceCPU: "500m",
|
v1.ResourceCPU: "500m",
|
||||||
v1.ResourceMemory: "500",
|
v1.ResourceMemory: "500",
|
||||||
}
|
}
|
||||||
_, err := createNodes(testCtx.ClientSet, "test-node", st.MakeNode().Capacity(nodeRes), 1)
|
_, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode().Capacity(nodeRes), 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Cannot create nodes: %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
permitPlugin.failPermit = false
|
permitPlugin.failPermit = false
|
||||||
@ -1900,9 +1900,8 @@ func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestCont
|
|||||||
go testCtx.Scheduler.Run(testCtx.Ctx)
|
go testCtx.Scheduler.Run(testCtx.Ctx)
|
||||||
|
|
||||||
if nodeCount > 0 {
|
if nodeCount > 0 {
|
||||||
_, err := createNodes(testCtx.ClientSet, "test-node", st.MakeNode(), nodeCount)
|
if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil {
|
||||||
if err != nil {
|
t.Fatal(err)
|
||||||
t.Fatalf("Cannot create nodes: %v", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return testCtx
|
return testCtx
|
||||||
|
@ -43,9 +43,9 @@ func TestInterPodAffinity(t *testing.T) {
|
|||||||
defer testutils.CleanupTest(t, testCtx)
|
defer testutils.CleanupTest(t, testCtx)
|
||||||
|
|
||||||
// Add a few nodes with labels
|
// Add a few nodes with labels
|
||||||
nodes, err := createNodes(testCtx.ClientSet, "testnode", st.MakeNode().Label("region", "r1").Label("zone", "z11"), 2)
|
nodes, err := createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode().Label("region", "r1").Label("zone", "z11"), 2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Cannot create nodes: %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cs := testCtx.ClientSet
|
cs := testCtx.ClientSet
|
||||||
|
@ -39,7 +39,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// This file tests the scheduler priority functions.
|
// This file tests the scheduler priority functions.
|
||||||
|
|
||||||
func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *testutils.TestContext {
|
func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *testutils.TestContext {
|
||||||
prof := schedulerconfig.KubeSchedulerProfile{
|
prof := schedulerconfig.KubeSchedulerProfile{
|
||||||
SchedulerName: v1.DefaultSchedulerName,
|
SchedulerName: v1.DefaultSchedulerName,
|
||||||
@ -72,9 +71,9 @@ func TestNodeAffinity(t *testing.T) {
|
|||||||
testCtx := initTestSchedulerForPriorityTest(t, nodeaffinity.Name)
|
testCtx := initTestSchedulerForPriorityTest(t, nodeaffinity.Name)
|
||||||
defer testutils.CleanupTest(t, testCtx)
|
defer testutils.CleanupTest(t, testCtx)
|
||||||
// Add a few nodes.
|
// Add a few nodes.
|
||||||
_, err := createNodes(testCtx.ClientSet, "testnode", st.MakeNode(), 4)
|
_, err := createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode(), 4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Cannot create nodes: %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// Add a label to one of the nodes.
|
// Add a label to one of the nodes.
|
||||||
labelKey := "kubernetes.io/node-topologyKey"
|
labelKey := "kubernetes.io/node-topologyKey"
|
||||||
@ -126,9 +125,9 @@ func TestPodAffinity(t *testing.T) {
|
|||||||
// Add a few nodes.
|
// Add a few nodes.
|
||||||
topologyKey := "node-topologykey"
|
topologyKey := "node-topologykey"
|
||||||
topologyValue := "topologyvalue"
|
topologyValue := "topologyvalue"
|
||||||
nodesInTopology, err := createNodes(testCtx.ClientSet, "in-topology", st.MakeNode().Label(topologyKey, topologyValue), 5)
|
nodesInTopology, err := createAndWaitForNodesInCache(testCtx, "in-topology", st.MakeNode().Label(topologyKey, topologyValue), 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Cannot create nodes: %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// Add a pod with a label and wait for it to schedule.
|
// Add a pod with a label and wait for it to schedule.
|
||||||
labelKey := "service"
|
labelKey := "service"
|
||||||
@ -142,9 +141,9 @@ func TestPodAffinity(t *testing.T) {
|
|||||||
t.Fatalf("Error running the attractor pod: %v", err)
|
t.Fatalf("Error running the attractor pod: %v", err)
|
||||||
}
|
}
|
||||||
// Add a few more nodes without the topology label.
|
// Add a few more nodes without the topology label.
|
||||||
_, err = createNodes(testCtx.ClientSet, "other-node", st.MakeNode(), 5)
|
_, err = createAndWaitForNodesInCache(testCtx, "other-node", st.MakeNode(), 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Cannot create the second set of nodes: %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// Add a new pod with affinity to the attractor pod.
|
// Add a new pod with affinity to the attractor pod.
|
||||||
podName := "pod-with-podaffinity"
|
podName := "pod-with-podaffinity"
|
||||||
@ -215,9 +214,9 @@ func TestImageLocality(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add a few nodes.
|
// Add a few nodes.
|
||||||
_, err = createNodes(testCtx.ClientSet, "testnode", st.MakeNode(), 10)
|
_, err = createAndWaitForNodesInCache(testCtx, "testnode", st.MakeNode(), 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot create nodes: %v", err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a pod with containers each having the specified image.
|
// Create a pod with containers each having the specified image.
|
||||||
|
@ -427,7 +427,7 @@ func TestUnschedulableNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, mod := range nodeModifications {
|
for i, mod := range nodeModifications {
|
||||||
unSchedNode, err := testCtx.ClientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
|
unSchedNode, err := createNode(testCtx.ClientSet, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create node: %v", err)
|
t.Fatalf("Failed to create node: %v", err)
|
||||||
}
|
}
|
||||||
@ -510,7 +510,7 @@ func TestMultipleSchedulers(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
testCtx.ClientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
|
createNode(testCtx.ClientSet, node)
|
||||||
|
|
||||||
// 3. create 3 pods for testing
|
// 3. create 3 pods for testing
|
||||||
t.Logf("create 3 pods for testing")
|
t.Logf("create 3 pods for testing")
|
||||||
@ -635,7 +635,7 @@ func TestMultipleSchedulingProfiles(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
|
if _, err := createNode(testCtx.ClientSet, node); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -825,6 +825,10 @@ func TestSchedulerInformers(t *testing.T) {
|
|||||||
t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
|
t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Ensure nodes are present in scheduler cache.
|
||||||
|
if err := waitForNodesInCache(testCtx.Scheduler, len(test.nodes)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
pods := make([]*v1.Pod, len(test.existingPods))
|
pods := make([]*v1.Pod, len(test.existingPods))
|
||||||
var err error
|
var err error
|
||||||
|
@ -111,7 +111,7 @@ func initTestDisablePreemption(t *testing.T, nsPrefix string) *testutils.TestCon
|
|||||||
// to see is in the store. Used to observe reflected events.
|
// to see is in the store. Used to observe reflected events.
|
||||||
func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
|
func waitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
|
||||||
passFunc func(n interface{}) bool) error {
|
passFunc func(n interface{}) bool) error {
|
||||||
nodes := []*v1.Node{}
|
var nodes []*v1.Node
|
||||||
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
n, err := nodeLister.Get(key)
|
n, err := nodeLister.Get(key)
|
||||||
|
|
||||||
@ -143,6 +143,9 @@ func createNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
|
|||||||
|
|
||||||
// createNodes creates `numNodes` nodes. The created node names will be in the
|
// createNodes creates `numNodes` nodes. The created node names will be in the
|
||||||
// form of "`prefix`-X" where X is an ordinal.
|
// form of "`prefix`-X" where X is an ordinal.
|
||||||
|
// DEPRECATED
|
||||||
|
// use createAndWaitForNodesInCache instead, which ensures the created nodes
|
||||||
|
// to be present in scheduler cache.
|
||||||
func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
|
func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
|
||||||
nodes := make([]*v1.Node, numNodes)
|
nodes := make([]*v1.Node, numNodes)
|
||||||
for i := 0; i < numNodes; i++ {
|
for i := 0; i < numNodes; i++ {
|
||||||
@ -156,6 +159,29 @@ func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper,
|
|||||||
return nodes[:], nil
|
return nodes[:], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createAndWaitForNodesInCache calls createNodes(), and wait for the created
|
||||||
|
// nodes to be present in scheduler cache.
|
||||||
|
func createAndWaitForNodesInCache(testCtx *testutils.TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
|
||||||
|
existingNodes := testCtx.Scheduler.SchedulerCache.NodeCount()
|
||||||
|
nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
|
||||||
|
if err != nil {
|
||||||
|
return nodes, fmt.Errorf("cannot create nodes: %v", err)
|
||||||
|
}
|
||||||
|
return nodes, waitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache
|
||||||
|
// within 30 seconds; otherwise returns false.
|
||||||
|
func waitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
|
||||||
|
err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
|
return sched.SchedulerCache.NodeCount() >= nodeCount, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type pausePodConfig struct {
|
type pausePodConfig struct {
|
||||||
Name string
|
Name string
|
||||||
Namespace string
|
Namespace string
|
||||||
|
Loading…
Reference in New Issue
Block a user