kubernetes/test/e2e/network/networking_perf.go
Patrick Ohly 136f89dfc5 e2e: use error wrapping with %w
The recently introduced failure handling in ExpectNoError depends on error
wrapping: if an error prefix gets added with `fmt.Errorf("foo: %v", err)`, then
ExpectNoError cannot detect that the root cause is an assertion failure and
then will add another useless "unexpected error" prefix and will not dump the
additional failure information (currently the backtrace inside the E2E
framework).

Instead of manually deciding on a case-by-case basis where %w is needed, all
error wrapping was updated automatically with

    sed -i "s/fmt.Errorf\(.*\): '*\(%s\|%v\)'*\",\(.* err)\)/fmt.Errorf\1: %w\",\3/" $(git grep -l 'fmt.Errorf' test/e2e*)

This may be unnecessary in some cases, but it's not wrong.
2023-02-06 15:39:13 +01:00

288 lines
12 KiB
Go

/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
// Tests network performance using iperf or other containers.
import (
"context"
"fmt"
"time"
"github.com/onsi/ginkgo/v2"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/network/common"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
)
const (
// use this timeout for larger clusters
largeClusterTimeout = 400 * time.Second
// iperf2BaselineBandwidthMegabytesPerSecond sets a baseline for iperf2 bandwidth of 10 MBps = 80 Mbps
// this limits is chosen in order to support small devices with 100 mbps cards.
iperf2BaselineBandwidthMegabytesPerSecond = 10
// iperf2Port selects an arbitrary, unique port to run iperf2's client and server on
iperf2Port = 6789
// labelKey is used as a key for selectors
labelKey = "app"
// clientLabelValue is used as a value for iperf2 client selectors
clientLabelValue = "iperf2-client"
// serverLabelValue is used as a value for iperf2 server selectors
serverLabelValue = "iperf2-server"
// serverServiceName defines the service name used for the iperf2 server
serverServiceName = "iperf2-server"
)
func iperf2ServerDeployment(ctx context.Context, client clientset.Interface, namespace string, isIPV6 bool) (*appsv1.Deployment, error) {
framework.Logf("deploying iperf2 server")
one := int64(1)
replicas := int32(1)
labels := map[string]string{labelKey: serverLabelValue}
args := []string{
"-s",
"-p",
fmt.Sprintf("%d", iperf2Port),
}
if isIPV6 {
args = append(args, "-V")
}
deploymentSpec := e2edeployment.NewDeployment(
"iperf2-server-deployment", replicas, labels, "iperf2-server",
imageutils.GetE2EImage(imageutils.Agnhost), appsv1.RollingUpdateDeploymentStrategyType)
deploymentSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
deploymentSpec.Spec.Template.Spec.Containers[0].Command = []string{"iperf"}
deploymentSpec.Spec.Template.Spec.Containers[0].Args = args
deploymentSpec.Spec.Template.Spec.Containers[0].Ports = []v1.ContainerPort{
{
ContainerPort: iperf2Port,
Protocol: v1.ProtocolTCP,
},
}
deployment, err := client.AppsV1().Deployments(namespace).Create(ctx, deploymentSpec, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("deployment %q Create API error: %w", deploymentSpec.Name, err)
}
framework.Logf("Waiting for deployment %q to complete", deploymentSpec.Name)
err = e2edeployment.WaitForDeploymentComplete(client, deployment)
if err != nil {
return nil, fmt.Errorf("deployment %q failed to complete: %w", deploymentSpec.Name, err)
}
return deployment, nil
}
func iperf2ServerService(ctx context.Context, client clientset.Interface, namespace string) (*v1.Service, error) {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serverServiceName},
Spec: v1.ServiceSpec{
Selector: map[string]string{
labelKey: serverLabelValue,
},
Ports: []v1.ServicePort{
{Protocol: v1.ProtocolTCP, Port: iperf2Port},
},
},
}
return client.CoreV1().Services(namespace).Create(ctx, service, metav1.CreateOptions{})
}
func iperf2ClientDaemonSet(ctx context.Context, client clientset.Interface, namespace string) (*appsv1.DaemonSet, error) {
one := int64(1)
labels := map[string]string{labelKey: clientLabelValue}
spec := e2edaemonset.NewDaemonSet("iperf2-clients", imageutils.GetE2EImage(imageutils.Agnhost), labels, nil, nil, nil)
spec.Spec.Template.Spec.TerminationGracePeriodSeconds = &one
ds, err := client.AppsV1().DaemonSets(namespace).Create(ctx, spec, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("daemonset %s Create API error: %w", spec.Name, err)
}
return ds, nil
}
// Test summary:
//
// This test uses iperf2 to obtain bandwidth data between nodes in the cluster, providing a coarse measure
// of the health of the cluster network. The test runs two sets of pods:
// 1. an iperf2 server on a single node
// 2. a daemonset of iperf2 clients
// The test then iterates through the clients, one by one, running iperf2 from each of them to transfer
// data to the server and back for ten seconds, after which the results are collected and parsed.
// Thus, if your cluster has 10 nodes, then 10 test runs are performed.
// Note: a more complete test could run this scenario with a daemonset of servers as well; however, this
// would require n^2 tests, n^2 time, and n^2 network resources which quickly become prohibitively large
// as the cluster size increases.
// Finally, after collecting all data, the results are analyzed and tabulated.
var _ = common.SIGDescribe("Networking IPerf2 [Feature:Networking-Performance]", func() {
// this test runs iperf2: one pod as a server, and a daemonset of clients
f := framework.NewDefaultFramework("network-perf")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline
ginkgo.It("should run iperf2", func(ctx context.Context) {
readySchedulableNodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
framework.ExpectNoError(err)
familyStr := ""
if framework.TestContext.ClusterIsIPv6() {
familyStr = "-V "
}
serverPodsListOptions := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", labelKey, serverLabelValue),
}
// Step 1: set up iperf2 server -- a single pod on any node
_, err = iperf2ServerDeployment(ctx, f.ClientSet, f.Namespace.Name, framework.TestContext.ClusterIsIPv6())
framework.ExpectNoError(err, "deploy iperf2 server deployment")
_, err = iperf2ServerService(ctx, f.ClientSet, f.Namespace.Name)
framework.ExpectNoError(err, "deploy iperf2 server service")
// Step 2: set up iperf2 client daemonset
// initially, the clients don't do anything -- they simply pause until they're called
_, err = iperf2ClientDaemonSet(ctx, f.ClientSet, f.Namespace.Name)
framework.ExpectNoError(err, "deploy iperf2 client daemonset")
// Make sure the server is ready to go
framework.Logf("waiting for iperf2 server endpoints")
err = wait.Poll(2*time.Second, largeClusterTimeout, func() (done bool, err error) {
listOptions := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serverServiceName)}
esList, err := f.ClientSet.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, listOptions)
framework.ExpectNoError(err, "Error fetching EndpointSlice for Service %s/%s", f.Namespace.Name, serverServiceName)
if len(esList.Items) == 0 {
framework.Logf("EndpointSlice for Service %s/%s not found", f.Namespace.Name, serverServiceName)
return false, nil
}
return true, nil
})
framework.ExpectNoError(err, "unable to wait for endpoints for the iperf service")
framework.Logf("found iperf2 server endpoints")
clientPodsListOptions := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", labelKey, clientLabelValue),
}
framework.Logf("waiting for client pods to be running")
var clientPodList *v1.PodList
err = wait.Poll(2*time.Second, largeClusterTimeout, func() (done bool, err error) {
clientPodList, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, clientPodsListOptions)
if err != nil {
return false, err
}
if len(clientPodList.Items) < len(readySchedulableNodes.Items) {
return false, nil
}
for _, pod := range clientPodList.Items {
if pod.Status.Phase != v1.PodRunning {
return false, nil
}
}
return true, nil
})
framework.ExpectNoError(err, "unable to wait for client pods to come up")
framework.Logf("all client pods are ready: %d pods", len(clientPodList.Items))
// Get a reference to the server pod for later
serverPodList, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, serverPodsListOptions)
framework.ExpectNoError(err)
if len(serverPodList.Items) != 1 {
framework.Failf("expected 1 server pod, found %d", len(serverPodList.Items))
}
serverPod := serverPodList.Items[0]
framework.Logf("server pod phase %s", serverPod.Status.Phase)
for i, condition := range serverPod.Status.Conditions {
framework.Logf("server pod condition %d: %+v", i, condition)
}
for i, cont := range serverPod.Status.ContainerStatuses {
framework.Logf("server pod container status %d: %+v", i, cont)
}
framework.Logf("found %d matching client pods", len(clientPodList.Items))
nodeResults := &IPerf2NodeToNodeCSVResults{
ServerNode: serverPod.Spec.NodeName,
Results: map[string]*IPerf2EnhancedCSVResults{},
}
// Step 3: iterate through the client pods one by one, running iperf2 in client mode to transfer
// data to the server and back and measure bandwidth
for _, pod := range clientPodList.Items {
podName := pod.Name
nodeName := pod.Spec.NodeName
iperfVersion := e2epod.ExecShellInPod(ctx, f, podName, "iperf -v || true")
framework.Logf("iperf version: %s", iperfVersion)
for try := 0; ; try++ {
/* iperf2 command parameters:
* -e: use enhanced reporting giving more tcp/udp and traffic information
* -p %d: server port to connect to
* --reportstyle C: report as Comma-Separated Values
* -i 1: seconds between periodic bandwidth reports
* -c %s: run in client mode, connecting to <host>
*/
command := fmt.Sprintf(`iperf %s -e -p %d --reportstyle C -i 1 -c %s && sleep 5`, familyStr, iperf2Port, serverServiceName)
framework.Logf("attempting to run command '%s' in client pod %s (node %s)", command, podName, nodeName)
output := e2epod.ExecShellInPod(ctx, f, podName, command)
framework.Logf("output from exec on client pod %s (node %s): \n%s\n", podName, nodeName, output)
results, err := ParseIPerf2EnhancedResultsFromCSV(output)
if err == nil {
nodeResults.Results[nodeName] = results
break
} else if try == 2 {
framework.ExpectNoError(err, "unable to parse iperf2 output from client pod %s (node %s)", pod.Name, nodeName)
} else {
framework.Logf("Retrying: IPerf run failed: %+v", err)
}
}
}
// Step 4: after collecting all the client<->server data, compile and present the results
/*
Example output:
Dec 22 07:52:41.102: INFO: From To Bandwidth (MB/s)
Dec 22 07:52:41.102: INFO: three-node-ipv6-worker three-node-ipv6-worker2 2381
Dec 22 07:52:41.102: INFO: three-node-ipv6-worker2 three-node-ipv6-worker2 2214
Dec 22 07:52:41.102: INFO: three-node-ipv6-worker3 three-node-ipv6-worker2 3123
*/
framework.Logf("%35s%35s%20s", "From", "To", "Bandwidth (MB/s)")
for nodeFrom, results := range nodeResults.Results {
framework.Logf("%35s%35s%20d", nodeFrom, nodeResults.ServerNode, results.Total.bandwidthMB())
}
for clientNode, results := range nodeResults.Results {
megabytesPerSecond := results.Total.bandwidthMB()
if megabytesPerSecond < iperf2BaselineBandwidthMegabytesPerSecond {
framework.Failf("iperf2 MB/s received below baseline of %d for client %s to server %s: %d", iperf2BaselineBandwidthMegabytesPerSecond, clientNode, nodeResults.ServerNode, megabytesPerSecond)
}
}
})
})