diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index f7b9c9b7cd0..de38d477cf2 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -27,6 +27,7 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -92,6 +93,18 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { } } + // Data structures for tracking server and client pods + type serverPod struct { + node *v1.Node + pod *v1.Pod + } + + type clientPod struct { + node *v1.Node + endpoints []*serverPod + pod *v1.Pod + } + //////////////////////////////////////////////////////////////////////////// // Main test specifications. //////////////////////////////////////////////////////////////////////////// @@ -109,13 +122,13 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) framework.ExpectNoError(err) - nodeForZone := make(map[string]string) + nodeForZone := make(map[string]*v1.Node) for _, zone := range zones { found := false for _, node := range nodeList.Items { if zone == node.Labels[v1.LabelTopologyZone] { found = true - nodeForZone[zone] = node.GetName() + nodeForZone[zone] = &node } } if !found { @@ -123,20 +136,42 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { } } - ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) - zoneForServingPod := make(map[string]string) - var servingPods []*v1.Pod + var clientPods []*clientPod + var serverPods []*serverPod + + // We want clients in all three zones + for _, node := range nodeForZone { + clientPods = append(clientPods, &clientPod{node: node}) + } + + // and endpoints in the first two zones + serverPods = []*serverPod{ + {node: clientPods[0].node}, + {node: clientPods[1].node}, + } + + // The clients with an endpoint in the same zone should only connect to + // that endpoint. The client with no endpoint in its zone should connect + // to both endpoints. + clientPods[0].endpoints = []*serverPod{serverPods[0]} + clientPods[1].endpoints = []*serverPod{serverPods[1]} + clientPods[2].endpoints = serverPods + + var podsToCreate []*v1.Pod servingPodLabels := map[string]string{"app": f.UniqueName} - for _, zone := range zones[:2] { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + for i, sp := range serverPods { + node := sp.node.Name + zone := sp.node.Labels[v1.LabelTopologyZone] + pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname") + ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone)) + nodeSelection := e2epod.NodeSelection{Name: node} e2epod.SetNodeSelection(&pod.Spec, nodeSelection) pod.Labels = servingPodLabels - servingPods = append(servingPods, pod) - zoneForServingPod[pod.Name] = zone + sp.pod = pod + podsToCreate = append(podsToCreate, pod) } - e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) + e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) trafficDist := v1.ServiceTrafficDistributionPreferClose svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ @@ -156,95 +191,63 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) ginkgo.By("waiting for EndpointSlices to be created") - err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) framework.ExpectNoError(err) slices := endpointSlicesForService(svc.Name) framework.Logf("got slices:\n%v", format.Object(slices, 1)) - ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") - - createClientPod := func(ctx context.Context, zone string) *v1.Pod { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + podsToCreate = nil + for i, cp := range clientPods { + node := cp.node.Name + zone := cp.node.Labels[v1.LabelTopologyZone] + pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("client-%d-%s", i, node), nil, nil, nil) + ginkgo.By(fmt.Sprintf("creating a client pod %q on node %q in zone %q", pod.Name, node, zone)) + nodeSelection := e2epod.NodeSelection{Name: node} e2epod.SetNodeSelection(&pod.Spec, nodeSelection) cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} pod.Spec.Containers[0].Name = pod.Name - return e2epod.NewPodClient(f).CreateSync(ctx, pod) + cp.pod = pod + podsToCreate = append(podsToCreate, pod) } + e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) - for _, clientZone := range zones[:2] { - framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) + for _, cp := range clientPods { + wantedEndpoints := sets.New[string]() + for _, sp := range cp.endpoints { + wantedEndpoints.Insert(sp.pod.Name) + } + unreachedEndpoints := wantedEndpoints.Clone() - framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) + ginkgo.By(fmt.Sprintf("ensuring that requests from %s on %s go to the endpoint(s) %v", cp.pod.Name, cp.node.Name, wantedEndpoints.UnsortedList())) - requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + requestsSucceed := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { logLines := reverseChronologicalLogLines if len(logLines) < 20 { return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil } - consecutiveSameZone := 0 + consecutiveSuccessfulRequests := 0 for _, logLine := range logLines { if logLine == "" || strings.HasPrefix(logLine, "Date:") { continue } - destZone, ok := zoneForServingPod[logLine] - if !ok { - return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + destEndpoint := logLine + if !wantedEndpoints.Has(destEndpoint) { + return gomegaCustomError("request from %s should not have reached %s\nreverseChronologicalLogLines=\n%v", cp.pod.Name, destEndpoint, strings.Join(reverseChronologicalLogLines, "\n")), nil } - if clientZone != destZone { - return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone++ - if consecutiveSameZone >= 10 { + consecutiveSuccessfulRequests++ + unreachedEndpoints.Delete(destEndpoint) + if consecutiveSuccessfulRequests >= 10 && len(unreachedEndpoints) == 0 { return nil, nil // Pass condition. } } // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil + return gomegaCustomError("requests didn't meet the required criteria to reach all endpoints %v\nreverseChronologicalLogLines=\n%v", wantedEndpoints.UnsortedList(), strings.Join(reverseChronologicalLogLines, "\n")), nil }) - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) + gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed) } - - ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") - - clientZone := zones[2] - framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) - - framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) - - requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { - logLines := reverseChronologicalLogLines - if len(logLines) < 20 { - return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil - } - - // Requests are counted as successful when the response read from the log - // lines is the name of a recognizable serving pod. - consecutiveSuccessfulRequests := 0 - - for _, logLine := range logLines { - if logLine == "" || strings.HasPrefix(logLine, "Date:") { - continue - } - _, servingPodExists := zoneForServingPod[logLine] - if !servingPodExists { - return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSuccessfulRequests++ - if consecutiveSuccessfulRequests >= 10 { - return nil, nil // Pass condition - } - } - // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil - }) - - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) }) })