From 76c19a24db12f7deb4aeb3be2c3483d0f34a3e5a Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Mon, 25 Oct 2021 22:39:32 -0700 Subject: [PATCH 1/2] Initial Topology Hints e2e Tests --- test/e2e/network/topology_hints.go | 188 +++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 test/e2e/network/topology_hints.go diff --git a/test/e2e/network/topology_hints.go b/test/e2e/network/topology_hints.go new file mode 100644 index 00000000000..f2b0de99baa --- /dev/null +++ b/test/e2e/network/topology_hints.go @@ -0,0 +1,188 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +import ( + "context" + "fmt" + "time" + + "github.com/onsi/ginkgo" + v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2eservice "k8s.io/kubernetes/test/e2e/framework/service" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + "k8s.io/kubernetes/test/e2e/network/common" + imageutils "k8s.io/kubernetes/test/utils/image" +) + +var _ = common.SIGDescribe("Feature:Topology Hints", func() { + f := framework.NewDefaultFramework("topology-hints") + + // filled in BeforeEach + var c clientset.Interface + + ginkgo.BeforeEach(func() { + c = f.ClientSet + e2eskipper.SkipUnlessMultizone(c) + }) + + ginkgo.It("should distribute endpoints evenly", func() { + portNum := 9376 + thLabels := map[string]string{labelKey: clientLabelValue} + img := imageutils.GetE2EImage(imageutils.Agnhost) + ports := []v1.ContainerPort{{ContainerPort: int32(portNum)}} + dsConf := e2edaemonset.NewDaemonSet("topology-serve-hostname", img, thLabels, nil, nil, ports, "serve-hostname") + ds, err := c.AppsV1().DaemonSets(f.Namespace.Name).Create(context.TODO(), dsConf, metav1.CreateOptions{}) + framework.ExpectNoError(err, "error creating DaemonSet") + + svc := createServiceReportErr(c, f.Namespace.Name, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "topology-hints", + Annotations: map[string]string{ + v1.AnnotationTopologyAwareHints: "Auto", + }, + }, + Spec: v1.ServiceSpec{ + Selector: thLabels, + PublishNotReadyAddresses: true, + Ports: []v1.ServicePort{{ + Name: "example", + Port: 80, + TargetPort: intstr.FromInt(portNum), + Protocol: v1.ProtocolTCP, + }}, + }, + }) + + err = wait.Poll(5*time.Second, framework.PodStartTimeout, func() (bool, error) { + return e2edaemonset.CheckRunningOnAllNodes(f, ds) + }) + framework.ExpectNoError(err, "timed out waiting for DaemonSets to be ready") + + nodeNames := e2edaemonset.SchedulableNodes(c, ds) + framework.Logf("Waiting for %d endpoints to be tracked in EndpointSlices", len(nodeNames)) + + var finalSlices []discoveryv1.EndpointSlice + err = wait.Poll(5*time.Second, 3*time.Minute, func() (bool, error) { + slices, listErr := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.Name)}) + if listErr != nil { + return false, listErr + } + + numEndpoints := 0 + for _, slice := range slices.Items { + numEndpoints += len(slice.Endpoints) + } + if len(nodeNames) > numEndpoints { + framework.Logf("Expected %d endpoints, got %d", len(nodeNames), numEndpoints) + return false, nil + } + + finalSlices = slices.Items + return true, nil + }) + framework.ExpectNoError(err, "timed out waiting for EndpointSlices to be ready") + + ginkgo.By("having hints set for each endpoint") + for _, slice := range finalSlices { + for _, ep := range slice.Endpoints { + if ep.Zone == nil { + framework.Failf("Expected endpoint in %s to have zone: %v", slice.Name, ep) + } + if ep.Hints == nil || len(ep.Hints.ForZones) == 0 { + framework.Failf("Expected endpoint in %s to have hints: %v", slice.Name, ep) + } + if len(ep.Hints.ForZones) > 1 { + framework.Failf("Expected endpoint in %s to have exactly 1 zone hint, got %d: %v", slice.Name, len(ep.Hints.ForZones), ep) + } + if *ep.Zone != ep.Hints.ForZones[0].Name { + framework.Failf("Expected endpoint in %s to have same zone hint, got %s: %v", slice.Name, *ep.Zone, ep) + } + } + } + + nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + nodesByZone := map[string]string{} + for _, node := range nodeList.Items { + if zone, ok := node.Labels[v1.LabelTopologyZone]; ok { + nodesByZone[node.Name] = zone + } + } + + podList, err := c.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + podsByZone := map[string]string{} + for _, pod := range podList.Items { + if zone, ok := nodesByZone[pod.Spec.NodeName]; ok { + podsByZone[pod.Name] = zone + } + } + + ginkgo.By("keeping requests in the same zone") + for i, nodeName := range nodeNames { + // Iterate through max of 3 nodes + if i > 2 { + break + } + fromZone, ok := nodesByZone[nodeName] + if !ok { + framework.Failf("Expected zone to be specified for %s node", nodeName) + } + execPod := e2epod.CreateExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-", func(pod *v1.Pod) { + pod.Spec.NodeName = nodeName + }) + framework.Logf("Ensuring that requests from %s pod on %s node stay in %s zone", execPod.Name, nodeName, fromZone) + + cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:80/", svc.Name) + consecutiveRequests := 0 + var stdout string + if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { + var err error + stdout, err = framework.RunHostCmd(f.Namespace.Name, execPod.Name, cmd) + if err != nil { + framework.Logf("unexpected error running %s from %s", cmd, execPod.Name) + return false, nil + } + destZone, ok := podsByZone[stdout] + if !ok { + framework.Logf("could not determine dest zone from output: %s", stdout) + return false, nil + } + if fromZone != destZone { + framework.Logf("expected request from %s to stay in %s zone, delivered to %s zone", execPod.Name, fromZone, destZone) + return false, nil + } + consecutiveRequests++ + if consecutiveRequests < 5 { + return false, nil + } + return true, nil + }); pollErr != nil { + framework.Failf("expected 5 consecutive requests from %s to stay in %s zone %v within %v, stdout: %v", execPod.Name, fromZone, e2eservice.KubeProxyLagTimeout, stdout) + } + } + }) +}) From d4ebc6ff1e803fb422ebd785825ea3210111f3e4 Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Mon, 1 Nov 2021 13:38:22 -0700 Subject: [PATCH 2/2] Transitioning from multiple execs to reading logs --- test/e2e/network/topology_hints.go | 69 +++++++++++++++++++----------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/test/e2e/network/topology_hints.go b/test/e2e/network/topology_hints.go index f2b0de99baa..77d59531e2d 100644 --- a/test/e2e/network/topology_hints.go +++ b/test/e2e/network/topology_hints.go @@ -19,6 +19,7 @@ package network import ( "context" "fmt" + "strings" "time" "github.com/onsi/ginkgo" @@ -151,37 +152,55 @@ var _ = common.SIGDescribe("Feature:Topology Hints", func() { if !ok { framework.Failf("Expected zone to be specified for %s node", nodeName) } - execPod := e2epod.CreateExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-", func(pod *v1.Pod) { - pod.Spec.NodeName = nodeName - }) - framework.Logf("Ensuring that requests from %s pod on %s node stay in %s zone", execPod.Name, nodeName, fromZone) + ginkgo.By("creating a client pod for probing the service from " + fromZone) + podName := "curl-from-" + fromZone + clientPod := e2epod.NewAgnhostPod(f.Namespace.Name, podName, nil, nil, nil, "serve-hostname") + nodeSelection := e2epod.NodeSelection{Name: nodeName} + e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) + clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + clientPod.Spec.Containers[0].Name = clientPod.Name + f.PodClient().CreateSync(clientPod) - cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:80/", svc.Name) - consecutiveRequests := 0 - var stdout string - if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { + framework.Logf("Ensuring that requests from %s pod on %s node stay in %s zone", clientPod.Name, nodeName, fromZone) + + var logs string + if pollErr := wait.Poll(5*time.Second, e2eservice.KubeProxyLagTimeout, func() (bool, error) { var err error - stdout, err = framework.RunHostCmd(f.Namespace.Name, execPod.Name, cmd) - if err != nil { - framework.Logf("unexpected error running %s from %s", cmd, execPod.Name) + logs, err = e2epod.GetPodLogs(c, f.Namespace.Name, clientPod.Name, clientPod.Name) + framework.ExpectNoError(err) + framework.Logf("Pod client logs: %s", logs) + + logLines := strings.Split(logs, "\n") + if len(logLines) < 6 { + framework.Logf("only %d log lines, waiting for at least 6", len(logLines)) return false, nil } - destZone, ok := podsByZone[stdout] - if !ok { - framework.Logf("could not determine dest zone from output: %s", stdout) - return false, nil + + consecutiveSameZone := 0 + + for i := len(logLines) - 1; i > 0; i-- { + if logLines[i] == "" || strings.HasPrefix(logLines[i], "Date:") { + continue + } + destZone, ok := podsByZone[logLines[i]] + if !ok { + framework.Logf("could not determine dest zone from log line: %s", logLines[i]) + return false, nil + } + if fromZone != destZone { + framework.Logf("expected request from %s to stay in %s zone, delivered to %s zone", clientPod.Name, fromZone, destZone) + return false, nil + } + consecutiveSameZone++ + if consecutiveSameZone >= 5 { + return true, nil + } } - if fromZone != destZone { - framework.Logf("expected request from %s to stay in %s zone, delivered to %s zone", execPod.Name, fromZone, destZone) - return false, nil - } - consecutiveRequests++ - if consecutiveRequests < 5 { - return false, nil - } - return true, nil + + return false, nil }); pollErr != nil { - framework.Failf("expected 5 consecutive requests from %s to stay in %s zone %v within %v, stdout: %v", execPod.Name, fromZone, e2eservice.KubeProxyLagTimeout, stdout) + framework.Failf("expected 5 consecutive requests from %s to stay in zone %s within %v, stdout: %v", clientPod.Name, fromZone, e2eservice.KubeProxyLagTimeout, logs) } } })