diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 19f80e5cf61..f7b9c9b7cd0 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -96,159 +96,155 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { // Main test specifications. //////////////////////////////////////////////////////////////////////////// - ginkgo.When("Service has trafficDistribution=PreferClose", func() { - ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) { + ginkgo.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { - ginkgo.By("finding 3 zones with schedulable nodes") - allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) - framework.ExpectNoError(err) - if len(allZonesSet) < 3 { - framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) - } - zones := allZonesSet.UnsortedList()[:3] + ginkgo.By("finding 3 zones with schedulable nodes") + allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) + framework.ExpectNoError(err) + if len(allZonesSet) < 3 { + framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) + } + zones := allZonesSet.UnsortedList()[:3] - ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) - nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) - framework.ExpectNoError(err) - nodeForZone := make(map[string]string) - for _, zone := range zones { - found := false - for _, node := range nodeList.Items { - if zone == node.Labels[v1.LabelTopologyZone] { - found = true - nodeForZone[zone] = node.GetName() - } - } - if !found { - framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */)) + ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + nodeForZone := make(map[string]string) + for _, zone := range zones { + found := false + for _, node := range nodeList.Items { + if zone == node.Labels[v1.LabelTopologyZone] { + found = true + nodeForZone[zone] = node.GetName() } } - - ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) - zoneForServingPod := make(map[string]string) - var servingPods []*v1.Pod - servingPodLabels := map[string]string{"app": f.UniqueName} - for _, zone := range zones[:2] { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} - e2epod.SetNodeSelection(&pod.Spec, nodeSelection) - pod.Labels = servingPodLabels - - servingPods = append(servingPods, pod) - zoneForServingPod[pod.Name] = zone + if !found { + framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */)) } - e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) + } - trafficDist := v1.ServiceTrafficDistributionPreferClose - svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "traffic-dist-test-service", - }, - Spec: v1.ServiceSpec{ - Selector: servingPodLabels, - TrafficDistribution: &trafficDist, - Ports: []v1.ServicePort{{ - Port: 80, - TargetPort: intstr.FromInt32(9376), - Protocol: v1.ProtocolTCP, - }}, - }, - }) - ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) + ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) + zoneForServingPod := make(map[string]string) + var servingPods []*v1.Pod + servingPodLabels := map[string]string{"app": f.UniqueName} + for _, zone := range zones[:2] { + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") + nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + pod.Labels = servingPodLabels - ginkgo.By("waiting for EndpointSlices to be created") - err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) - framework.ExpectNoError(err) - slices := endpointSlicesForService(svc.Name) - framework.Logf("got slices:\n%v", format.Object(slices, 1)) + servingPods = append(servingPods, pod) + zoneForServingPod[pod.Name] = zone + } + e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) - ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") + trafficDist := v1.ServiceTrafficDistributionPreferClose + svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "traffic-dist-test-service", + }, + Spec: v1.ServiceSpec{ + Selector: servingPodLabels, + TrafficDistribution: &trafficDist, + Ports: []v1.ServicePort{{ + Port: 80, + TargetPort: intstr.FromInt32(9376), + Protocol: v1.ProtocolTCP, + }}, + }, + }) + ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) - createClientPod := func(ctx context.Context, zone string) *v1.Pod { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} - e2epod.SetNodeSelection(&pod.Spec, nodeSelection) - cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) - pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} - pod.Spec.Containers[0].Name = pod.Name + ginkgo.By("waiting for EndpointSlices to be created") + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + slices := endpointSlicesForService(svc.Name) + framework.Logf("got slices:\n%v", format.Object(slices, 1)) - return e2epod.NewPodClient(f).CreateSync(ctx, pod) - } + ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") - for _, clientZone := range zones[:2] { - framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) + createClientPod := func(ctx context.Context, zone string) *v1.Pod { + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) + nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) + pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + pod.Spec.Containers[0].Name = pod.Name - framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) + return e2epod.NewPodClient(f).CreateSync(ctx, pod) + } - requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { - logLines := reverseChronologicalLogLines - if len(logLines) < 20 { - return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone := 0 - - for _, logLine := range logLines { - if logLine == "" || strings.HasPrefix(logLine, "Date:") { - continue - } - destZone, ok := zoneForServingPod[logLine] - if !ok { - return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - if clientZone != destZone { - return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone++ - if consecutiveSameZone >= 10 { - return nil, nil // Pass condition. - } - } - // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil - }) - - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) - } - - ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") - - clientZone := zones[2] - framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) + for _, clientZone := range zones[:2] { + framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) clientPod := createClientPod(ctx, clientZone) - framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) + framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) - requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { logLines := reverseChronologicalLogLines if len(logLines) < 20 { return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil } - - // Requests are counted as successful when the response read from the log - // lines is the name of a recognizable serving pod. - consecutiveSuccessfulRequests := 0 + consecutiveSameZone := 0 for _, logLine := range logLines { if logLine == "" || strings.HasPrefix(logLine, "Date:") { continue } - _, servingPodExists := zoneForServingPod[logLine] - if !servingPodExists { - return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + destZone, ok := zoneForServingPod[logLine] + if !ok { + return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil } - consecutiveSuccessfulRequests++ - if consecutiveSuccessfulRequests >= 10 { - return nil, nil // Pass condition + if clientZone != destZone { + return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil + } + consecutiveSameZone++ + if consecutiveSameZone >= 10 { + return nil, nil // Pass condition. } } // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil + return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil }) - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) + gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) + } + ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") + + clientZone := zones[2] + framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) + clientPod := createClientPod(ctx, clientZone) + + framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) + + requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + logLines := reverseChronologicalLogLines + if len(logLines) < 20 { + return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil + } + + // Requests are counted as successful when the response read from the log + // lines is the name of a recognizable serving pod. + consecutiveSuccessfulRequests := 0 + + for _, logLine := range logLines { + if logLine == "" || strings.HasPrefix(logLine, "Date:") { + continue + } + _, servingPodExists := zoneForServingPod[logLine] + if !servingPodExists { + return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + } + consecutiveSuccessfulRequests++ + if consecutiveSuccessfulRequests >= 10 { + return nil, nil // Pass condition + } + } + // Ideally, the matcher would never reach this condition + return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil }) + gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) }) })