From 95e315e427549e96df9a4a2b0647f04018d5bdf3 Mon Sep 17 00:00:00 2001 From: Jay Vyas Date: Thu, 17 Mar 2016 11:38:53 -0400 Subject: [PATCH] E2E Network performance run which uses iperf. generic pod-per-node functionality for testing - 2 node test only - update framework to decompose pod vs svc creation for composition. - remove hard coded 2 and pointer to --scale --- test/e2e/framework/framework.go | 78 +++++++++++++++- test/e2e/networking_perf.go | 157 ++++++++++++++++++++++++++++++++ test/e2e/util_iperf.go | 105 +++++++++++++++++++++ 3 files changed, 338 insertions(+), 2 deletions(-) create mode 100644 test/e2e/networking_perf.go create mode 100644 test/e2e/util_iperf.go diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 7cb857d2484..be10a114007 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -32,12 +32,13 @@ import ( adapter_1_2 "k8s.io/kubernetes/pkg/client/unversioned/adapters/release_1_2" adapter_1_3 "k8s.io/kubernetes/pkg/client/unversioned/adapters/release_1_3" "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/metrics" + "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -381,6 +382,79 @@ func (f *Framework) ReadFileViaContainer(podName, containerName string, path str return string(stdout), err } +// CreateServiceForSimpleAppWithPods is a convenience wrapper to create a service and its matching pods all at once. +func (f *Framework) CreateServiceForSimpleAppWithPods(contPort int, svcPort int, appName string, podSpec func(n api.Node) api.PodSpec, count int, block bool) (error, *api.Service) { + var err error = nil + theService := f.CreateServiceForSimpleApp(contPort, svcPort, appName) + f.CreatePodsPerNodeForSimpleApp(appName, podSpec, count) + if block { + err = WaitForPodsWithLabelRunning(f.Client, f.Namespace.Name, labels.SelectorFromSet(labels.Set(theService.Spec.Selector))) + } + return err, theService +} + +// CreateServiceForSimpleApp returns a service that selects/exposes pods (send -1 ports if no exposure needed) with an app label. +func (f *Framework) CreateServiceForSimpleApp(contPort, svcPort int, appName string) *api.Service { + if appName == "" { + panic(fmt.Sprintf("no app name provided")) + } + + serviceSelector := map[string]string{ + "app": appName + "-pod", + } + + // For convenience, user sending ports are optional. + portsFunc := func() []api.ServicePort { + if contPort < 1 || svcPort < 1 { + return nil + } else { + return []api.ServicePort{{ + Protocol: "TCP", + Port: svcPort, + TargetPort: intstr.FromInt(contPort), + }} + } + } + Logf("Creating a service-for-%v for selecting app=%v-pod", appName, appName) + service, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "service-for-" + appName, + Labels: map[string]string{ + "app": appName + "-service", + }, + }, + Spec: api.ServiceSpec{ + Ports: portsFunc(), + Selector: serviceSelector, + }, + }) + ExpectNoError(err) + return service +} + +// CreatePodsPerNodeForSimpleApp Creates pods w/ labels. Useful for tests which make a bunch of pods w/o any networking. +func (f *Framework) CreatePodsPerNodeForSimpleApp(appName string, podSpec func(n api.Node) api.PodSpec, maxCount int) map[string]string { + nodes := ListSchedulableNodesOrDie(f.Client) + labels := map[string]string{ + "app": appName + "-pod", + } + for i, node := range nodes.Items { + // one per node, but no more than maxCount. + if i <= maxCount { + Logf("%v/%v : Creating container with label app=%v-pod", i, maxCount, appName) + _, err := f.Client.Pods(f.Namespace.Name).Create(&api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf(appName+"-pod-%v", i), + Labels: labels, + }, + Spec: podSpec(node), + }) + ExpectNoError(err) + } + } + return labels +} + func kubectlExecWithRetry(namespace string, podName, containerName string, args ...string) ([]byte, []byte, error) { for numRetries := 0; numRetries < maxKubectlExecRetries; numRetries++ { if numRetries > 0 { diff --git a/test/e2e/networking_perf.go b/test/e2e/networking_perf.go new file mode 100644 index 00000000000..63791bb6ac5 --- /dev/null +++ b/test/e2e/networking_perf.go @@ -0,0 +1,157 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +// Tests network performance using iperf or other containers. +import ( + "fmt" + "math" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/test/e2e/framework" +) + +const ( + // empirically derived as a baseline for expectations from running this test using kube-up.sh. + gceBandwidthBitsEstimate = int64(30000000000) + // on 4 node clusters, we found this test passes very quickly, generally in less then 100 seconds. + smallClusterTimeout = 200 * time.Second +) + +// Declared as Flakey since it has not been proven to run in parallel on small nodes or slow networks in CI +// TODO jayunit100 : Retag this test according to semantics from #22401 +var _ = framework.KubeDescribe("Networking IPerf [Experimental] [Slow] [Feature:Networking-Performance]", func() { + + f := framework.NewDefaultFramework("network-perf") + + // A few simple bandwidth tests which are capped by nodes. + // TODO replace the 1 with the scale option implementation + runClientServerBandwidthMeasurement(f, 1, gceBandwidthBitsEstimate) +}) + +func runClientServerBandwidthMeasurement(f *framework.Framework, numClient int, maxBandwidthBits int64) { + // TODO: Make this a function parameter, once we distribute iperf endpoints, possibly via session affinity. + numServer := 1 + + It(fmt.Sprintf("should transfer ~ 1GB onto the service endpoint %v servers (maximum of %v clients)", numServer, numClient), func() { + nodes := framework.ListSchedulableNodesOrDie(f.Client) + totalPods := len(nodes.Items) + // for a single service, we expect to divide bandwidth between the network. Very crude estimate. + expectedBandwidth := int(float64(maxBandwidthBits) / float64(totalPods)) + Expect(totalPods).NotTo(Equal(0)) + appName := "iperf-e2e" + err, _ := f.CreateServiceForSimpleAppWithPods( + 8001, + 8002, + appName, + func(n api.Node) api.PodSpec { + return api.PodSpec{ + Containers: []api.Container{{ + Name: "iperf-server", + Image: "gcr.io/google_containers/iperf:e2e", + Args: []string{ + "/bin/sh", + "-c", + "/usr/local/bin/iperf -s -p 8001 ", + }, + Ports: []api.ContainerPort{{ContainerPort: 8001}}, + }}, + NodeName: n.Name, + RestartPolicy: api.RestartPolicyOnFailure, + } + }, + // this will be used to generate the -service name which all iperf clients point at. + numServer, // Generally should be 1 server unless we do affinity or use a version of iperf that supports LB + true, // Make sure we wait, otherwise all the clients will die and need to restart. + ) + + if err != nil { + framework.Failf("Fatal error waiting for iperf server endpoint : %v", err) + } + + iperfClientPodLabels := f.CreatePodsPerNodeForSimpleApp( + "iperf-e2e-cli", + func(n api.Node) api.PodSpec { + return api.PodSpec{ + Containers: []api.Container{ + { + Name: "iperf-client", + Image: "gcr.io/google_containers/iperf:e2e", + Args: []string{ + "/bin/sh", + "-c", + "/usr/local/bin/iperf -c service-for-" + appName + " -p 8002 --reportstyle C && sleep 5", + }, + }, + }, + RestartPolicy: api.RestartPolicyOnFailure, // let them successfully die. + } + }, + numClient, + ) + + framework.Logf("Reading all perf results to stdout.") + framework.Logf("date,cli,cliPort,server,serverPort,id,interval,transferBits,bandwidthBits") + + // Calculate expected number of clients based on total nodes. + expectedCli := func() int { + nodes, err := framework.GetReadyNodes(f) + framework.ExpectNoError(err) + return int(math.Min(float64(len(nodes.Items)), float64(numClient))) + }() + + // Extra 1/10 second per client. + iperfTimeout := smallClusterTimeout + (time.Duration(expectedCli/10) * time.Second) + iperfResults := &IPerfResults{} + + iperfClusterVerification := f.NewClusterVerification( + framework.PodStateVerification{ + Selectors: iperfClientPodLabels, + ValidPhases: []api.PodPhase{api.PodSucceeded}, + }, + ) + + pods, err2 := iperfClusterVerification.WaitFor(expectedCli, iperfTimeout) + if err2 != nil { + framework.Failf("Error in wait...") + } else if len(pods) < expectedCli { + framework.Failf("IPerf restuls : Only got %v out of %v, after waiting %v", len(pods), expectedCli, iperfTimeout) + } else { + // For each builds up a collection of IPerfRecords + iperfClusterVerification.ForEach( + func(p api.Pod) { + resultS, err := framework.LookForStringInLog(f.Namespace.Name, p.Name, "iperf-client", "0-", 1*time.Second) + if err == nil { + framework.Logf(resultS) + iperfResults.Add(NewIPerf(resultS)) + } else { + framework.Failf("Unexpected error, %v when running forEach on the pods.", err) + } + }) + } + fmt.Println("[begin] Node,Bandwith CSV") + fmt.Println(iperfResults.ToTSV()) + fmt.Println("[end] Node,Bandwith CSV") + + for ipClient, bandwidth := range iperfResults.BandwidthMap { + framework.Logf("%v had bandwidth %v. Ratio to expected (%v) was %f", ipClient, bandwidth, expectedBandwidth, float64(bandwidth)/float64(expectedBandwidth)) + } + }) +} diff --git a/test/e2e/util_iperf.go b/test/e2e/util_iperf.go new file mode 100644 index 00000000000..ccf4a70e04a --- /dev/null +++ b/test/e2e/util_iperf.go @@ -0,0 +1,105 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +// Tests network performance using iperf or other containers. +import ( + "bytes" + "encoding/json" + "fmt" + "strconv" + "strings" + + "k8s.io/kubernetes/test/e2e/framework" +) + +type IPerfResults struct { + BandwidthMap map[string]int64 +} + +// IPerfResult struct modelling an iperf record.... +// 20160314154239,172.17.0.3,34152,172.17.0.2,5001,3,0.0-10.0,33843707904,27074774092 +type IPerfResult struct { + date string // field 1 in the csv + cli string // field 2 in the csv + cliPort int64 // ... + server string + servPort int64 + id string + interval string + transferBits int64 + bandwidthBits int64 +} + +// Add adds a new result to the Results struct. +func (i *IPerfResults) Add(ipr *IPerfResult) { + if i.BandwidthMap == nil { + i.BandwidthMap = map[string]int64{} + } + i.BandwidthMap[ipr.cli] = ipr.bandwidthBits +} + +// ToTSV exports an easily readable tab delimited format of all IPerfResults. +func (i *IPerfResults) ToTSV() string { + if len(i.BandwidthMap) < 1 { + framework.Logf("Warning: no data in bandwidth map") + } + + var buffer bytes.Buffer + for node, bandwidth := range i.BandwidthMap { + asJson, _ := json.Marshal(node) + buffer.WriteString("\t " + string(asJson) + "\t " + fmt.Sprintf("%E", float64(bandwidth))) + } + return buffer.String() +} + +// NewIPerf parses an IPerf CSV output line into an IPerfResult. +func NewIPerf(csvLine string) *IPerfResult { + slice := StrSlice(strings.Split(csvLine, ",")) + if len(slice) != 9 { + framework.Failf("Incorrect fields in the output: %v (%v out of 9)", slice, len(slice)) + } + i := IPerfResult{} + i.date = slice.get(0) + i.cli = slice.get(1) + i.cliPort = intOrFail("client port", slice.get(2)) + i.server = slice.get(3) + i.servPort = intOrFail("server port", slice.get(4)) + i.id = slice.get(5) + i.interval = slice.get(6) + i.transferBits = intOrFail("transfer port", slice.get(7)) + i.bandwidthBits = intOrFail("bandwidth port", slice.get(8)) + return &i +} + +type StrSlice []string + +func (s StrSlice) get(i int) string { + if i >= 0 && i < len(s) { + return s[i] + } + return "" +} + +// intOrFail is a convenience function for parsing integers. +func intOrFail(debugName string, rawValue string) int64 { + value, err := strconv.ParseInt(rawValue, 10, 64) + if err != nil { + framework.Failf("Failed parsing value %v from the string '%v' as an integer", debugName, rawValue) + } + return value +}