diff --git a/test/e2e/network/BUILD b/test/e2e/network/BUILD index 83c8f4ff5eb..a95cecd5b8e 100644 --- a/test/e2e/network/BUILD +++ b/test/e2e/network/BUILD @@ -96,6 +96,7 @@ go_library( "//vendor/github.com/miekg/dns:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/gomega:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", "//vendor/google.golang.org/api/compute/v1:go_default_library", "//vendor/k8s.io/utils/net:go_default_library", ], diff --git a/test/e2e/network/networking_perf.go b/test/e2e/network/networking_perf.go index 73e49192b75..deb854c5dd0 100644 --- a/test/e2e/network/networking_perf.go +++ b/test/e2e/network/networking_perf.go @@ -18,145 +18,311 @@ package network // Tests network performance using iperf or other containers. import ( + "context" "fmt" + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" "time" "github.com/onsi/ginkgo" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" + e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2enode "k8s.io/kubernetes/test/e2e/framework/node" - e2eservice "k8s.io/kubernetes/test/e2e/framework/service" imageutils "k8s.io/kubernetes/test/utils/image" ) const ( - // empirically derived as a baseline for expectations from running this test using kube-up.sh. - gceBandwidthBitsEstimate = int64(30000000000) - // on 4 node clusters, we found this test passes very quickly, generally in less then 100 seconds. - smallClusterTimeout = 200 * time.Second + // use this timeout for larger clusters + largeClusterTimeout = 400 * time.Second + // iperf2BaselineBandwidthMegabytesPerSecond sets a baseline for iperf2 bandwidth of 90 MB/s + iperf2BaselineBandwidthMegabytesPerSecond = 90 + // iperf2Port selects an arbitrary, unique port to run iperf2's client and server on + iperf2Port = 6789 + // labelKey is used as a key for selectors + labelKey = "app" + // clientLabelValue is used as a value for iperf2 client selectors + clientLabelValue = "iperf2-client" + // serverLabelValue is used as a value for iperf2 server selectors + serverLabelValue = "iperf2-server" + // serverServiceName defines the service name used for the iperf2 server + serverServiceName = "iperf2-server" ) -// Declared as Flakey since it has not been proven to run in parallel on small nodes or slow networks in CI -var _ = SIGDescribe("Networking IPerf [Experimental] [Slow] [Feature:Networking-Performance]", func() { - - f := framework.NewDefaultFramework("network-perf") - - // A few simple bandwidth tests which are capped by nodes. - // TODO replace the 1 with the scale option implementation - // TODO: Make this a function parameter, once we distribute iperf endpoints, possibly via session affinity. - numClient := 1 - numServer := 1 - maxBandwidthBits := gceBandwidthBitsEstimate - - familyStr := "" - if framework.TestContext.ClusterIsIPv6() { - familyStr = "-V " +func iperf2ServerDeployment(client clientset.Interface, namespace string, isIPV6 bool) (*appsv1.Deployment, error) { + framework.Logf("deploying iperf2 server") + one := int64(1) + replicas := int32(1) + labels := map[string]string{labelKey: serverLabelValue} + args := []string{ + "-s", + "-p", + fmt.Sprintf("%d", iperf2Port), } - - ginkgo.It(fmt.Sprintf("should transfer ~ 1GB onto the service endpoint %v servers (maximum of %v clients)", numServer, numClient), func() { - nodes, err := e2enode.GetReadySchedulableNodes(f.ClientSet) - framework.ExpectNoError(err) - totalPods := len(nodes.Items) - // for a single service, we expect to divide bandwidth between the network. Very crude estimate. - expectedBandwidth := int(float64(maxBandwidthBits) / float64(totalPods)) - appName := "iperf-e2e" - _, err = e2eservice.CreateServiceForSimpleAppWithPods( - f.ClientSet, - 8001, - 8002, - f.Namespace.Name, - appName, - func(n v1.Node) v1.PodSpec { - return v1.PodSpec{ - Containers: []v1.Container{{ - Name: "iperf-server", - Image: imageutils.GetE2EImage(imageutils.Agnhost), - Command: []string{"/bin/sh"}, - Args: []string{ - "-c", - "/usr/local/bin/iperf " + familyStr + "-s -p 8001 ", - }, - Ports: []v1.ContainerPort{{ContainerPort: 8001}}, - }}, - NodeName: n.Name, - RestartPolicy: v1.RestartPolicyOnFailure, - } + if isIPV6 { + args = append(args, "-V") + } + deploymentSpec := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "iperf2-server-deployment", + Labels: labels, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, }, - // this will be used to generate the -service name which all iperf clients point at. - numServer, // Generally should be 1 server unless we do affinity or use a version of iperf that supports LB - true, // Make sure we wait, otherwise all the clients will die and need to restart. - ) - - if err != nil { - framework.Failf("Fatal error waiting for iperf server endpoint : %v", err) - } - - iperfClientPodLabels := e2enode.CreatePodsPerNodeForSimpleApp( - f.ClientSet, - f.Namespace.Name, - "iperf-e2e-cli", - func(n v1.Node) v1.PodSpec { - return v1.PodSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &one, Containers: []v1.Container{ { - Name: "iperf-client", + Name: "iperf2-server", Image: imageutils.GetE2EImage(imageutils.Agnhost), - Command: []string{"/bin/sh"}, - Args: []string{ - "-c", - "/usr/local/bin/iperf " + familyStr + "-c service-for-" + appName + " -p 8002 --reportstyle C && sleep 5", + Command: []string{"iperf"}, + Args: args, + Ports: []v1.ContainerPort{ + { + ContainerPort: iperf2Port, + Protocol: v1.ProtocolTCP, + }, }, }, }, - RestartPolicy: v1.RestartPolicyOnFailure, // let them successfully die. + }, + }, + }, + } + + deployment, err := client.AppsV1().Deployments(namespace).Create(context.TODO(), deploymentSpec, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("deployment %q Create API error: %v", deploymentSpec.Name, err) + } + framework.Logf("Waiting for deployment %q to complete", deploymentSpec.Name) + err = e2edeployment.WaitForDeploymentComplete(client, deployment) + if err != nil { + return nil, fmt.Errorf("deployment %q failed to complete: %v", deploymentSpec.Name, err) + } + + return deployment, nil +} + +func iperf2ServerService(client clientset.Interface, namespace string) (*v1.Service, error) { + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: serverServiceName}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + labelKey: serverLabelValue, + }, + Ports: []v1.ServicePort{ + {Protocol: v1.ProtocolTCP, Port: iperf2Port}, + }, + }, + } + return client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) +} + +func iperf2ClientDaemonSet(client clientset.Interface, namespace string) (*appsv1.DaemonSet, error) { + one := int64(1) + labels := map[string]string{labelKey: clientLabelValue} + spec := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "iperf2-clients", + Labels: labels, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "iperf2-client", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Command: []string{"/agnhost"}, + Args: []string{"pause"}, + }, + }, + TerminationGracePeriodSeconds: &one, + }, + }, + }, + Status: appsv1.DaemonSetStatus{}, + } + + ds, err := client.AppsV1().DaemonSets(namespace).Create(context.TODO(), spec, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("daemonset %s Create API error: %v", spec.Name, err) + } + return ds, nil +} + +// Test summary: +// This test uses iperf2 to obtain bandwidth data between nodes in the cluster, providing a coarse measure +// of the health of the cluster network. The test runs two sets of pods: +// 1. an iperf2 server on a single node +// 2. a daemonset of iperf2 clients +// The test then iterates through the clients, one by one, running iperf2 from each of them to transfer +// data to the server and back for ten seconds, after which the results are collected and parsed. +// Thus, if your cluster has 10 nodes, then 10 test runs are performed. +// Note: a more complete test could run this scenario with a daemonset of servers as well; however, this +// would require n^2 tests, n^2 time, and n^2 network resources which quickly become prohibitively large +// as the cluster size increases. +// Finally, after collecting all data, the results are analyzed and tabulated. +var _ = SIGDescribe("Networking IPerf2 [Feature:Networking-Performance]", func() { + // this test runs iperf2: one pod as a server, and a daemonset of clients + f := framework.NewDefaultFramework("network-perf") + + ginkgo.It(fmt.Sprintf("should run iperf2"), func() { + readySchedulableNodes, err := e2enode.GetReadySchedulableNodes(f.ClientSet) + framework.ExpectNoError(err) + + familyStr := "" + if framework.TestContext.ClusterIsIPv6() { + familyStr = "-V " + } + + serverPodsListOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", labelKey, serverLabelValue), + } + + // Step 1: set up iperf2 server -- a single pod on any node + _, err = iperf2ServerDeployment(f.ClientSet, f.Namespace.Name, framework.TestContext.ClusterIsIPv6()) + framework.ExpectNoError(err, "deploy iperf2 server deployment") + + _, err = iperf2ServerService(f.ClientSet, f.Namespace.Name) + framework.ExpectNoError(err, "deploy iperf2 server service") + + // Step 2: set up iperf2 client daemonset + // initially, the clients don't do anything -- they simply pause until they're called + _, err = iperf2ClientDaemonSet(f.ClientSet, f.Namespace.Name) + framework.ExpectNoError(err, "deploy iperf2 client daemonset") + + // Make sure the server is ready to go + framework.Logf("waiting for iperf2 server endpoints") + err = wait.Poll(2*time.Second, largeClusterTimeout, func() (done bool, err error) { + listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1beta1.LabelServiceName, serverServiceName)} + esList, err := f.ClientSet.DiscoveryV1beta1().EndpointSlices(f.Namespace.Name).List(context.TODO(), listOptions) + framework.ExpectNoError(err, "Error fetching EndpointSlice for Service %s/%s", f.Namespace.Name, serverServiceName) + + if len(esList.Items) == 0 { + framework.Logf("EndpointSlice for Service %s/%s not found", f.Namespace.Name, serverServiceName) + return false, nil + } + return true, nil + }) + framework.ExpectNoError(err, "unable to wait for endpoints for the iperf service") + framework.Logf("found iperf2 server endpoints") + + clientPodsListOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", labelKey, clientLabelValue), + } + + framework.Logf("waiting for client pods to be running") + var clientPodList *v1.PodList + err = wait.Poll(2*time.Second, largeClusterTimeout, func() (done bool, err error) { + clientPodList, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), clientPodsListOptions) + if err != nil { + return false, err + } + if len(clientPodList.Items) < len(readySchedulableNodes.Items) { + return false, nil + } + for _, pod := range clientPodList.Items { + if pod.Status.Phase != v1.PodRunning { + return false, nil } - }, - numClient, - ) - expectedCli := numClient - if len(nodes.Items) < expectedCli { - expectedCli = len(nodes.Items) + } + return true, nil + }) + framework.ExpectNoError(err, "unable to wait for client pods to come up") + framework.Logf("all client pods are ready: %d pods", len(clientPodList.Items)) + + // Get a reference to the server pod for later + serverPodList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), serverPodsListOptions) + framework.ExpectNoError(err) + if len(serverPodList.Items) != 1 { + framework.Failf("expected 1 server pod, found %d", len(serverPodList.Items)) + } + serverPod := serverPodList.Items[0] + framework.Logf("server pod phase %s", serverPod.Status.Phase) + for i, condition := range serverPod.Status.Conditions { + framework.Logf("server pod condition %d: %+v", i, condition) + } + for i, cont := range serverPod.Status.ContainerStatuses { + framework.Logf("server pod container status %d: %+v", i, cont) } - framework.Logf("Reading all perf results to stdout.") - framework.Logf("date,cli,cliPort,server,serverPort,id,interval,transferBits,bandwidthBits") + framework.Logf("found %d matching client pods", len(clientPodList.Items)) - // Extra 1/10 second per client. - iperfTimeout := smallClusterTimeout + (time.Duration(expectedCli/10) * time.Second) - iperfResults := &IPerfResults{} - - iperfClusterVerification := f.NewClusterVerification( - f.Namespace, - framework.PodStateVerification{ - Selectors: iperfClientPodLabels, - ValidPhases: []v1.PodPhase{v1.PodSucceeded}, - }, - ) - - pods, err2 := iperfClusterVerification.WaitFor(expectedCli, iperfTimeout) - if err2 != nil { - framework.Failf("Error in wait...") - } else if len(pods) < expectedCli { - framework.Failf("IPerf restuls : Only got %v out of %v, after waiting %v", len(pods), expectedCli, iperfTimeout) - } else { - // For each builds up a collection of IPerfRecords - iperfClusterVerification.ForEach( - func(p v1.Pod) { - resultS, err := framework.LookForStringInLog(f.Namespace.Name, p.Name, "iperf-client", "0-", 1*time.Second) - if err == nil { - framework.Logf(resultS) - iperfResults.Add(NewIPerf(resultS)) - } else { - framework.Failf("Unexpected error, %v when running forEach on the pods.", err) - } - }) + nodeResults := &IPerf2NodeToNodeCSVResults{ + ServerNode: serverPod.Spec.NodeName, + Results: map[string]*IPerf2EnhancedCSVResults{}, } - fmt.Println("[begin] Node,Bandwidth CSV") - fmt.Println(iperfResults.ToTSV()) - fmt.Println("[end] Node,Bandwidth CSV") - for ipClient, bandwidth := range iperfResults.BandwidthMap { - framework.Logf("%v had bandwidth %v. Ratio to expected (%v) was %f", ipClient, bandwidth, expectedBandwidth, float64(bandwidth)/float64(expectedBandwidth)) + // Step 3: iterate through the client pods one by one, running iperf2 in client mode to transfer + // data to the server and back and measure bandwidth + for _, pod := range clientPodList.Items { + podName := pod.Name + nodeName := pod.Spec.NodeName + + iperfVersion := f.ExecShellInPod(podName, "iperf -v || true") + framework.Logf("iperf version: %s", iperfVersion) + + for try := 0; ; try++ { + /* iperf2 command parameters: + * -e: use enhanced reporting giving more tcp/udp and traffic information + * -p %d: server port to connect to + * --reportstyle C: report as Comma-Separated Values + * -i 1: seconds between periodic bandwidth reports + * -c %s: run in client mode, connecting to + */ + command := fmt.Sprintf(`iperf %s -e -p %d --reportstyle C -i 1 -c %s && sleep 5`, familyStr, iperf2Port, serverServiceName) + framework.Logf("attempting to run command '%s' in client pod %s (node %s)", command, podName, nodeName) + output := f.ExecShellInPod(podName, command) + framework.Logf("output from exec on client pod %s (node %s): \n%s\n", podName, nodeName, output) + + results, err := ParseIPerf2EnhancedResultsFromCSV(output) + if err == nil { + nodeResults.Results[nodeName] = results + break + } else if try == 2 { + framework.ExpectNoError(err, "unable to parse iperf2 output from client pod %s (node %s)", pod.Name, nodeName) + } else { + framework.Logf("Retrying: IPerf run failed: %+v", err) + } + } + } + + // Step 4: after collecting all the client<->server data, compile and present the results + /* + Example output: + + Dec 22 07:52:41.102: INFO: From To Bandwidth (MB/s) + Dec 22 07:52:41.102: INFO: three-node-ipv6-worker three-node-ipv6-worker2 2381 + Dec 22 07:52:41.102: INFO: three-node-ipv6-worker2 three-node-ipv6-worker2 2214 + Dec 22 07:52:41.102: INFO: three-node-ipv6-worker3 three-node-ipv6-worker2 3123 + + */ + framework.Logf("%35s%35s%20s", "From", "To", "Bandwidth (MB/s)") + for nodeFrom, results := range nodeResults.Results { + framework.Logf("%35s%35s%20d", nodeFrom, nodeResults.ServerNode, results.Total.bandwidthMB()) + } + for clientNode, results := range nodeResults.Results { + megabytesPerSecond := results.Total.bandwidthMB() + if megabytesPerSecond < iperf2BaselineBandwidthMegabytesPerSecond { + framework.Failf("iperf2 MB/s received below baseline of %d for client %s to server %s: %d", iperf2BaselineBandwidthMegabytesPerSecond, clientNode, nodeResults.ServerNode, megabytesPerSecond) + } } }) }) diff --git a/test/e2e/network/util_iperf.go b/test/e2e/network/util_iperf.go index 651366fadc9..f151a2cfef1 100644 --- a/test/e2e/network/util_iperf.go +++ b/test/e2e/network/util_iperf.go @@ -21,20 +21,26 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/pkg/errors" + "math" "strconv" "strings" "k8s.io/kubernetes/test/e2e/framework" ) -// IPerfResults is a struct that stores some IPerfResult +const ( + megabyte = 1024 * 1024 +) + +// IPerfResults is a struct that stores some IPerfCSVResult type IPerfResults struct { BandwidthMap map[string]int64 } -// IPerfResult struct modelling an iperf record.... +// IPerfCSVResult struct modelling an iperf record.... // 20160314154239,172.17.0.3,34152,172.17.0.2,5001,3,0.0-10.0,33843707904,27074774092 -type IPerfResult struct { +type IPerfCSVResult struct { date string // field 1 in the csv cli string // field 2 in the csv cliPort int64 // ... @@ -46,8 +52,12 @@ type IPerfResult struct { bandwidthBits int64 } +func (i *IPerfCSVResult) bandwidthMB() int64 { + return int64(math.Round(float64(i.bandwidthBits) / float64(megabyte) / 8)) +} + // Add adds a new result to the Results struct. -func (i *IPerfResults) Add(ipr *IPerfResult) { +func (i *IPerfResults) Add(ipr *IPerfCSVResult) { if i.BandwidthMap == nil { i.BandwidthMap = map[string]int64{} } @@ -68,14 +78,17 @@ func (i *IPerfResults) ToTSV() string { return buffer.String() } -// NewIPerf parses an IPerf CSV output line into an IPerfResult. -func NewIPerf(csvLine string) *IPerfResult { +// NewIPerf parses an IPerf CSV output line into an IPerfCSVResult. +func NewIPerf(csvLine string) (*IPerfCSVResult, error) { + if len(csvLine) == 0 { + return nil, errors.New("No iperf output received in csv line") + } csvLine = strings.Trim(csvLine, "\n") slice := StrSlice(strings.Split(csvLine, ",")) if len(slice) != 9 { - framework.Failf("Incorrect fields in the output: %v (%v out of 9)", slice, len(slice)) + return nil, errors.Errorf("Incorrect fields in the output: %v (%v out of 9)", slice, len(slice)) } - i := IPerfResult{} + i := IPerfCSVResult{} i.date = slice.get(0) i.cli = slice.get(1) i.cliPort = intOrFail("client port", slice.get(2)) @@ -85,7 +98,7 @@ func NewIPerf(csvLine string) *IPerfResult { i.interval = slice.get(6) i.transferBits = intOrFail("transfer port", slice.get(7)) i.bandwidthBits = intOrFail("bandwidth port", slice.get(8)) - return &i + return &i, nil } // StrSlice represents a string slice @@ -106,3 +119,53 @@ func intOrFail(debugName string, rawValue string) int64 { } return value } + +// IPerf2EnhancedCSVResults models the results produced by iperf2 when run with the -e (--enhancedreports) flag. +type IPerf2EnhancedCSVResults struct { + Intervals []*IPerfCSVResult + Total *IPerfCSVResult +} + +// ParseIPerf2EnhancedResultsFromCSV parses results from iperf2 when given the -e (--enhancedreports) +// and `--reportstyle C` options. +// Example output: +// 20201210141800.884,10.244.2.24,47880,10.96.114.79,6789,3,0.0-1.0,1677852672,13422821376 +// 20201210141801.881,10.244.2.24,47880,10.96.114.79,6789,3,1.0-2.0,1980760064,15846080512 +// 20201210141802.883,10.244.2.24,47880,10.96.114.79,6789,3,2.0-3.0,1886650368,15093202944 +// 20201210141803.882,10.244.2.24,47880,10.96.114.79,6789,3,3.0-4.0,2035417088,16283336704 +// 20201210141804.879,10.244.2.24,47880,10.96.114.79,6789,3,4.0-5.0,1922957312,15383658496 +// 20201210141805.881,10.244.2.24,47880,10.96.114.79,6789,3,5.0-6.0,2095316992,16762535936 +// 20201210141806.882,10.244.2.24,47880,10.96.114.79,6789,3,6.0-7.0,1741291520,13930332160 +// 20201210141807.879,10.244.2.24,47880,10.96.114.79,6789,3,7.0-8.0,1862926336,14903410688 +// 20201210141808.878,10.244.2.24,47880,10.96.114.79,6789,3,8.0-9.0,1821245440,14569963520 +// 20201210141809.849,10.244.2.24,47880,10.96.114.79,6789,3,0.0-10.0,18752208896,15052492511 +func ParseIPerf2EnhancedResultsFromCSV(output string) (*IPerf2EnhancedCSVResults, error) { + var parsedResults []*IPerfCSVResult + for _, line := range strings.Split(output, "\n") { + parsed, err := NewIPerf(line) + if err != nil { + return nil, err + } + parsedResults = append(parsedResults, parsed) + } + if parsedResults == nil || len(parsedResults) == 0 { + return nil, errors.New("no results parsed from iperf2 output") + } + // format: + // all but last lines are intervals + intervals := parsedResults[:len(parsedResults)-1] + // last line is an aggregation + total := parsedResults[len(parsedResults)-1] + return &IPerf2EnhancedCSVResults{ + Intervals: intervals, + Total: total, + }, nil +} + +// IPerf2NodeToNodeCSVResults models the results of running iperf2 between a daemonset of clients and +// a single server. The node name of the server is captured, along with a map of client node name +// to iperf2 results. +type IPerf2NodeToNodeCSVResults struct { + ServerNode string + Results map[string]*IPerf2EnhancedCSVResults +}