Add e2enode.GetBoundedReadySchedulableNodes, replace some uses of framework.GetReadySchedulableNodesOrDie

For tests that want at-most-N nodes
This commit is contained in:
Dan Winship 2019-09-08 13:19:17 -04:00
parent 71b02dd422
commit 3c445b2ad0
12 changed files with 73 additions and 90 deletions

View File

@ -50,7 +50,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",

View File

@ -47,6 +47,7 @@ import (
"k8s.io/client-go/restmapper" "k8s.io/client-go/restmapper"
scaleclient "k8s.io/client-go/scale" scaleclient "k8s.io/client-go/scale"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epsp "k8s.io/kubernetes/test/e2e/framework/psp" e2epsp "k8s.io/kubernetes/test/e2e/framework/psp"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
@ -567,23 +568,21 @@ func (f *Framework) CreateServiceForSimpleApp(contPort, svcPort int, appName str
// CreatePodsPerNodeForSimpleApp creates pods w/ labels. Useful for tests which make a bunch of pods w/o any networking. // CreatePodsPerNodeForSimpleApp creates pods w/ labels. Useful for tests which make a bunch of pods w/o any networking.
func (f *Framework) CreatePodsPerNodeForSimpleApp(appName string, podSpec func(n v1.Node) v1.PodSpec, maxCount int) map[string]string { func (f *Framework) CreatePodsPerNodeForSimpleApp(appName string, podSpec func(n v1.Node) v1.PodSpec, maxCount int) map[string]string {
nodes := GetReadySchedulableNodesOrDie(f.ClientSet) nodes, err := e2enode.GetBoundedReadySchedulableNodes(f.ClientSet, maxCount)
ExpectNoError(err)
podLabels := map[string]string{ podLabels := map[string]string{
"app": appName + "-pod", "app": appName + "-pod",
} }
for i, node := range nodes.Items { for i, node := range nodes.Items {
// one per node, but no more than maxCount. Logf("%v/%v : Creating container with label app=%v-pod", i, maxCount, appName)
if i <= maxCount { _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(&v1.Pod{
Logf("%v/%v : Creating container with label app=%v-pod", i, maxCount, appName) ObjectMeta: metav1.ObjectMeta{
_, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(&v1.Pod{ Name: fmt.Sprintf(appName+"-pod-%v", i),
ObjectMeta: metav1.ObjectMeta{ Labels: podLabels,
Name: fmt.Sprintf(appName+"-pod-%v", i), },
Labels: podLabels, Spec: podSpec(node),
}, })
Spec: podSpec(node), ExpectNoError(err)
})
ExpectNoError(err)
}
} }
return podLabels return podLabels
} }

View File

@ -32,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -621,28 +620,11 @@ func (config *NetworkingTestConfig) setup(selector map[string]string) {
} }
} }
// shuffleNodes copies nodes from the specified slice into a copy in random
// order. It returns a new slice.
func shuffleNodes(nodes []v1.Node) []v1.Node {
shuffled := make([]v1.Node, len(nodes))
perm := rand.Perm(len(nodes))
for i, j := range perm {
shuffled[j] = nodes[i]
}
return shuffled
}
func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector map[string]string) []*v1.Pod { func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector map[string]string) []*v1.Pod {
ExpectNoError(WaitForAllNodesSchedulable(config.f.ClientSet, 10*time.Minute)) ExpectNoError(WaitForAllNodesSchedulable(config.f.ClientSet, 10*time.Minute))
nodeList := GetReadySchedulableNodesOrDie(config.f.ClientSet) nodeList, err := e2enode.GetBoundedReadySchedulableNodes(config.f.ClientSet, maxNetProxyPodsCount)
ExpectNoError(err)
// To make this test work reasonably fast in large clusters, nodes := nodeList.Items
// we limit the number of NetProxyPods to no more than
// maxNetProxyPodsCount on random nodes.
nodes := shuffleNodes(nodeList.Items)
if len(nodes) > maxNetProxyPodsCount {
nodes = nodes[:maxNetProxyPodsCount]
}
// create pods, one for each node // create pods, one for each node
createdPods := make([]*v1.Pod, 0, len(nodes)) createdPods := make([]*v1.Pod, 0, len(nodes))

View File

@ -15,6 +15,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -349,6 +350,27 @@ func GetReadySchedulableNodes(c clientset.Interface) (nodes *v1.NodeList, err er
return nodes, nil return nodes, nil
} }
// GetBoundedReadySchedulableNodes is like GetReadySchedulableNodes except that it returns
// at most maxNodes nodes. Use this to keep your test case from blowing up when run on a
// large cluster.
func GetBoundedReadySchedulableNodes(c clientset.Interface, maxNodes int) (nodes *v1.NodeList, err error) {
nodes, err = GetReadySchedulableNodes(c)
if err != nil {
return nil, err
}
if len(nodes.Items) > maxNodes {
shuffled := make([]v1.Node, maxNodes)
perm := rand.Perm(len(nodes.Items))
for i, j := range perm {
if j < len(shuffled) {
shuffled[j] = nodes.Items[i]
}
}
nodes.Items = shuffled
}
return nodes, nil
}
// GetReadyNodesIncludingTainted returns all ready nodes, even those which are tainted. // GetReadyNodesIncludingTainted returns all ready nodes, even those which are tainted.
// There are cases when we care about tainted nodes // There are cases when we care about tainted nodes
// E.g. in tests related to nodes with gpu we care about nodes despite // E.g. in tests related to nodes with gpu we care about nodes despite

View File

@ -341,11 +341,10 @@ func (k *NodeKiller) pickNodes() []v1.Node {
nodes, err := e2enode.GetReadySchedulableNodes(k.client) nodes, err := e2enode.GetReadySchedulableNodes(k.client)
ExpectNoError(err) ExpectNoError(err)
numNodes := int(k.config.FailureRatio * float64(len(nodes.Items))) numNodes := int(k.config.FailureRatio * float64(len(nodes.Items)))
shuffledNodes := shuffleNodes(nodes.Items)
if len(shuffledNodes) > numNodes { nodes, err = e2enode.GetBoundedReadySchedulableNodes(k.client, numNodes)
return shuffledNodes[:numNodes] ExpectNoError(err)
} return nodes.Items
return shuffledNodes
} }
func (k *NodeKiller) kill(nodes []v1.Node) { func (k *NodeKiller) kill(nodes []v1.Node) {

View File

@ -264,7 +264,8 @@ func (j *TestJig) CreateLoadBalancerService(namespace, serviceName string, timeo
// GetEndpointNodes returns a map of nodenames:external-ip on which the // GetEndpointNodes returns a map of nodenames:external-ip on which the
// endpoints of the given Service are running. // endpoints of the given Service are running.
func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string { func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
nodes := j.GetNodes(MaxNodesForEndpointsTests) nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{}) endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{})
if err != nil { if err != nil {
framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err) framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err)
@ -289,27 +290,6 @@ func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
return nodeMap return nodeMap
} }
// GetNodes returns the first maxNodesForTest nodes. Useful in large clusters
// where we don't eg: want to create an endpoint per node.
func (j *TestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) {
nodes = framework.GetReadySchedulableNodesOrDie(j.Client)
if len(nodes.Items) <= maxNodesForTest {
maxNodesForTest = len(nodes.Items)
}
nodes.Items = nodes.Items[:maxNodesForTest]
return nodes
}
// GetNodesNames returns a list of names of the first maxNodesForTest nodes
func (j *TestJig) GetNodesNames(maxNodesForTest int) []string {
nodes := j.GetNodes(maxNodesForTest)
nodesNames := []string{}
for _, node := range nodes.Items {
nodesNames = append(nodesNames, node.Name)
}
return nodesNames
}
// WaitForEndpointOnNode waits for a service endpoint on the given node. // WaitForEndpointOnNode waits for a service endpoint on the given node.
func (j *TestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) { func (j *TestJig) WaitForEndpointOnNode(namespace, serviceName, nodeName string) {
err := wait.PollImmediate(framework.Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) { err := wait.PollImmediate(framework.Poll, LoadBalancerCreateTimeoutDefault, func() (bool, error) {
@ -840,7 +820,8 @@ func (j *TestJig) checkNodePortServiceReachability(namespace string, svc *v1.Ser
servicePorts := svc.Spec.Ports servicePorts := svc.Spec.Ports
// Consider only 2 nodes for testing // Consider only 2 nodes for testing
nodes := j.GetNodes(2) nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, 2)
framework.ExpectNoError(err)
j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout) j.WaitForAvailableEndpoint(namespace, svc.Name, ServiceEndpointsTimeout)

View File

@ -123,6 +123,8 @@ var _ = SIGDescribe("[Feature:IPv6DualStackAlphaFeature] [LinuxOnly]", func() {
// get all schedulable nodes to determine the number of replicas for pods // get all schedulable nodes to determine the number of replicas for pods
// this is to ensure connectivity from all nodes on cluster // this is to ensure connectivity from all nodes on cluster
// FIXME: tests may be run in large clusters. This test is O(n^2) in the
// number of nodes used. It should use GetBoundedReadySchedulableNodes().
nodeList, err := e2enode.GetReadySchedulableNodes(cs) nodeList, err := e2enode.GetReadySchedulableNodes(cs)
framework.ExpectNoError(err) framework.ExpectNoError(err)

View File

@ -32,7 +32,6 @@ import (
gcecloud "k8s.io/legacy-cloud-providers/gce" gcecloud "k8s.io/legacy-cloud-providers/gce"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
"github.com/onsi/gomega"
) )
const ( const (
@ -73,11 +72,12 @@ var _ = SIGDescribe("Firewall rule", func() {
framework.Logf("Got cluster ID: %v", clusterID) framework.Logf("Got cluster ID: %v", clusterID)
jig := e2eservice.NewTestJig(cs, serviceName) jig := e2eservice.NewTestJig(cs, serviceName)
nodeList := jig.GetNodes(e2eservice.MaxNodesForEndpointsTests) nodeList, err := e2enode.GetBoundedReadySchedulableNodes(cs, e2eservice.MaxNodesForEndpointsTests)
gomega.Expect(nodeList).NotTo(gomega.BeNil()) framework.ExpectNoError(err)
nodesNames := jig.GetNodesNames(e2eservice.MaxNodesForEndpointsTests)
if len(nodesNames) <= 0 { nodesNames := []string{}
framework.Failf("Expect at least 1 node, got: %v", nodesNames) for _, node := range nodeList.Items {
nodesNames = append(nodesNames, node.Name)
} }
nodesSet := sets.NewString(nodesNames...) nodesSet := sets.NewString(nodesNames...)

View File

@ -51,15 +51,16 @@ var _ = SIGDescribe("Network", func() {
fr := framework.NewDefaultFramework("network") fr := framework.NewDefaultFramework("network")
ginkgo.It("should set TCP CLOSE_WAIT timeout", func() { ginkgo.It("should set TCP CLOSE_WAIT timeout", func() {
nodes := framework.GetReadySchedulableNodesOrDie(fr.ClientSet) nodes, err := e2enode.GetBoundedReadySchedulableNodes(fr.ClientSet, 2)
ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP) framework.ExpectNoError(err)
if len(nodes.Items) < 2 { if len(nodes.Items) < 2 {
framework.Skipf( framework.Skipf(
"Test requires >= 2 Ready nodes, but there are only %v nodes", "Test requires >= 2 Ready nodes, but there are only %v nodes",
len(nodes.Items)) len(nodes.Items))
} }
ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
type NodeInfo struct { type NodeInfo struct {
node *v1.Node node *v1.Node
name string name string
@ -81,7 +82,7 @@ var _ = SIGDescribe("Network", func() {
zero := int64(0) zero := int64(0)
// Some distributions (Ubuntu 16.04 etc.) don't support the proc file. // Some distributions (Ubuntu 16.04 etc.) don't support the proc file.
_, err := e2essh.IssueSSHCommandWithResult( _, err = e2essh.IssueSSHCommandWithResult(
"ls /proc/net/nf_conntrack", "ls /proc/net/nf_conntrack",
framework.TestContext.Provider, framework.TestContext.Provider,
clientNodeInfo.node) clientNodeInfo.node)

View File

@ -295,7 +295,8 @@ var _ = SIGDescribe("Services", func() {
framework.Logf("sourceip-test cluster ip: %s", serviceIP) framework.Logf("sourceip-test cluster ip: %s", serviceIP)
ginkgo.By("Picking 2 Nodes to test whether source IP is preserved or not") ginkgo.By("Picking 2 Nodes to test whether source IP is preserved or not")
nodes := jig.GetNodes(2) nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
framework.ExpectNoError(err)
nodeCounts := len(nodes.Items) nodeCounts := len(nodes.Items)
if nodeCounts < 2 { if nodeCounts < 2 {
framework.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) framework.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
@ -305,7 +306,7 @@ var _ = SIGDescribe("Services", func() {
serverPodName := "echo-sourceip" serverPodName := "echo-sourceip"
pod := f.NewAgnhostPod(serverPodName, "netexec", "--http-port", strconv.Itoa(servicePort)) pod := f.NewAgnhostPod(serverPodName, "netexec", "--http-port", strconv.Itoa(servicePort))
pod.Labels = jig.Labels pod.Labels = jig.Labels
_, err := cs.CoreV1().Pods(ns).Create(pod) _, err = cs.CoreV1().Pods(ns).Create(pod)
framework.ExpectNoError(err) framework.ExpectNoError(err)
framework.ExpectNoError(f.WaitForPodRunning(pod.Name)) framework.ExpectNoError(f.WaitForPodRunning(pod.Name))
defer func() { defer func() {
@ -1986,7 +1987,8 @@ var _ = SIGDescribe("Services", func() {
namespace := f.Namespace.Name namespace := f.Namespace.Name
serviceName := "no-pods" serviceName := "no-pods"
jig := e2eservice.NewTestJig(cs, serviceName) jig := e2eservice.NewTestJig(cs, serviceName)
nodes := jig.GetNodes(e2eservice.MaxNodesForEndpointsTests) nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, e2eservice.MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
labels := map[string]string{ labels := map[string]string{
"nopods": "nopods", "nopods": "nopods",
} }
@ -1997,7 +1999,7 @@ var _ = SIGDescribe("Services", func() {
}} }}
ginkgo.By("creating a service with no endpoints") ginkgo.By("creating a service with no endpoints")
_, err := jig.CreateServiceWithServicePort(labels, namespace, ports) _, err = jig.CreateServiceWithServicePort(labels, namespace, ports)
if err != nil { if err != nil {
framework.Failf("ginkgo.Failed to create service: %v", err) framework.Failf("ginkgo.Failed to create service: %v", err)
} }
@ -2169,7 +2171,8 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
namespace := f.Namespace.Name namespace := f.Namespace.Name
serviceName := "external-local-nodes" serviceName := "external-local-nodes"
jig := e2eservice.NewTestJig(cs, serviceName) jig := e2eservice.NewTestJig(cs, serviceName)
nodes := jig.GetNodes(e2eservice.MaxNodesForEndpointsTests) nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, e2eservice.MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
svc := jig.CreateOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false, svc := jig.CreateOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false,
func(svc *v1.Service) { func(svc *v1.Service) {
@ -2292,7 +2295,8 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
serviceName := "external-local-update" serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, serviceName) jig := e2eservice.NewTestJig(cs, serviceName)
nodes := jig.GetNodes(e2eservice.MaxNodesForEndpointsTests) nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, e2eservice.MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
if len(nodes.Items) < 2 { if len(nodes.Items) < 2 {
framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint") framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
} }

View File

@ -30,6 +30,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"k8s.io/kubernetes/test/e2e/framework/volume" "k8s.io/kubernetes/test/e2e/framework/volume"
@ -270,18 +271,10 @@ var _ = SIGDescribe("kubelet", func() {
// nodes we observe initially. // nodes we observe initially.
nodeLabels = make(map[string]string) nodeLabels = make(map[string]string)
nodeLabels["kubelet_cleanup"] = "true" nodeLabels["kubelet_cleanup"] = "true"
nodes := framework.GetReadySchedulableNodesOrDie(c) nodes, err := e2enode.GetBoundedReadySchedulableNodes(c, maxNodesToCheck)
numNodes = len(nodes.Items) framework.ExpectNoError(err)
framework.ExpectNotEqual(numNodes, 0)
nodeNames = sets.NewString() nodeNames = sets.NewString()
// If there are a lot of nodes, we don't want to use all of them for i := 0; i < len(nodes.Items); i++ {
// (if there are 1000 nodes in the cluster, starting 10 pods/node
// will take ~10 minutes today). And there is also deletion phase.
// Instead, we choose at most 10 nodes.
if numNodes > maxNodesToCheck {
numNodes = maxNodesToCheck
}
for i := 0; i < numNodes; i++ {
nodeNames.Insert(nodes.Items[i].Name) nodeNames.Insert(nodes.Items[i].Name)
} }
for nodeName := range nodeNames { for nodeName := range nodeNames {