From 2670462eb037db70cbbb6fc6100547d6a9557172 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Thu, 20 Mar 2025 14:57:05 -0400 Subject: [PATCH 1/9] Remove TopologyHints and TrafficDistribution feature flags The features are always enabled, so the tests don't need to be conditional. --- test/e2e/feature/feature.go | 8 -------- test/e2e/network/topology_hints.go | 3 +-- test/e2e/network/traffic_distribution.go | 4 +--- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/test/e2e/feature/feature.go b/test/e2e/feature/feature.go index 684d6f401ef..077f73e6db2 100644 --- a/test/e2e/feature/feature.go +++ b/test/e2e/feature/feature.go @@ -512,14 +512,6 @@ var ( // - ci-kubernetes-node-e2e-cri-proxy-serial CriProxy = framework.WithFeature(framework.ValidFeatures.Add("CriProxy")) - // Owner: sig-network - // Marks tests that require a cluster with Topology Hints enabled. - TopologyHints = framework.WithFeature(framework.ValidFeatures.Add("Topology Hints")) - - // Owner: sig-network - // Marks tests that require a cluster with Traffic Distribution enabled. - TrafficDistribution = framework.WithFeature(framework.ValidFeatures.Add("Traffic Distribution")) - // TODO: document the feature (owning SIG, when to use this feature for a test) TopologyManager = framework.WithFeature(framework.ValidFeatures.Add("TopologyManager")) diff --git a/test/e2e/network/topology_hints.go b/test/e2e/network/topology_hints.go index 5b553e46af9..c9b63683c5c 100644 --- a/test/e2e/network/topology_hints.go +++ b/test/e2e/network/topology_hints.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -41,7 +40,7 @@ import ( admissionapi "k8s.io/pod-security-admission/api" ) -var _ = common.SIGDescribe(feature.TopologyHints, func() { +var _ = common.SIGDescribe("Topology Hints", func() { f := framework.NewDefaultFramework("topology-hints") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 34591075d67..8db13f8da2b 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -28,8 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -43,7 +41,7 @@ import ( "github.com/onsi/gomega" ) -var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGate(features.ServiceTrafficDistribution), func() { +var _ = common.SIGDescribe("Traffic Distribution", func() { f := framework.NewDefaultFramework("traffic-distribution") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged From 8f7bb964de5764184006a58f9add3efa9b4d1e52 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 19 Mar 2025 16:55:46 -0400 Subject: [PATCH 2/9] TrafficDistribution e2e cleanups Remove endpointSlicesHaveSameZoneHints check. We are testing that connections end up at the right endpoints. We don't need to validate _why_ they go to the right endpoints, which is already tested by other tests anyway. (Also, validating the hints becomes more complicated in the same-node case, where there may or may not also be same-zone hints depending on cluster configuration.) Remove DeferCleanup calls; we don't need to delete anything manually because namespaced resources will automatically be deleted when the test case's namespace is deleted. Remove a setting of pod.NodeName that was redundant with e2epod.SetNodeSelection(). --- test/e2e/network/traffic_distribution.go | 36 ++++-------------------- 1 file changed, 5 insertions(+), 31 deletions(-) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 8db13f8da2b..19f80e5cf61 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -77,31 +77,6 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { } } - // endpointSlicesHaveSameZoneHints returns a matcher function to be used with - // gomega.Eventually().Should(...). It checks that the passed EndpointSlices - // have zone-hints which match the endpoint's zone. - endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) { - if len(slices) == 0 { - return nil, fmt.Errorf("no endpointslices found") - } - for _, slice := range slices { - for _, endpoint := range slice.Endpoints { - var ip string - if len(endpoint.Addresses) > 0 { - ip = endpoint.Addresses[0] - } - var zone string - if endpoint.Zone != nil { - zone = *endpoint.Zone - } - if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone { - return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 /* indent one level */)), nil - } - } - } - return nil, nil - }) - // requestsFromClient returns a helper function to be used with // gomega.Eventually(...). It fetches the logs from the clientPod and returns // them in reverse-chronological order. @@ -161,7 +136,6 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { servingPods = append(servingPods, pod) zoneForServingPod[pod.Name] = zone - ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{}) } e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) @@ -181,23 +155,23 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { }, }) ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) - ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{}) - ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints") - gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints) + ginkgo.By("waiting for EndpointSlices to be created") + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + slices := endpointSlicesForService(svc.Name) + framework.Logf("got slices:\n%v", format.Object(slices, 1)) ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") createClientPod := func(ctx context.Context, zone string) *v1.Pod { pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) - pod.Spec.NodeName = nodeForZone[zone] nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} e2epod.SetNodeSelection(&pod.Spec, nodeSelection) cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} pod.Spec.Containers[0].Name = pod.Name - ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{}) return e2epod.NewPodClient(f).CreateSync(ctx, pod) } From b1a0fea4c6fa6e812423936dab28ee9c94ffd8a3 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 19 Mar 2025 13:39:36 -0400 Subject: [PATCH 3/9] Remove a level of indentation in the TrafficDistribution e2e (No other changes to the code) --- test/e2e/network/traffic_distribution.go | 236 +++++++++++------------ 1 file changed, 116 insertions(+), 120 deletions(-) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 19f80e5cf61..f7b9c9b7cd0 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -96,159 +96,155 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { // Main test specifications. //////////////////////////////////////////////////////////////////////////// - ginkgo.When("Service has trafficDistribution=PreferClose", func() { - ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) { + ginkgo.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { - ginkgo.By("finding 3 zones with schedulable nodes") - allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) - framework.ExpectNoError(err) - if len(allZonesSet) < 3 { - framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) - } - zones := allZonesSet.UnsortedList()[:3] + ginkgo.By("finding 3 zones with schedulable nodes") + allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) + framework.ExpectNoError(err) + if len(allZonesSet) < 3 { + framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) + } + zones := allZonesSet.UnsortedList()[:3] - ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) - nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) - framework.ExpectNoError(err) - nodeForZone := make(map[string]string) - for _, zone := range zones { - found := false - for _, node := range nodeList.Items { - if zone == node.Labels[v1.LabelTopologyZone] { - found = true - nodeForZone[zone] = node.GetName() - } - } - if !found { - framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */)) + ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + nodeForZone := make(map[string]string) + for _, zone := range zones { + found := false + for _, node := range nodeList.Items { + if zone == node.Labels[v1.LabelTopologyZone] { + found = true + nodeForZone[zone] = node.GetName() } } - - ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) - zoneForServingPod := make(map[string]string) - var servingPods []*v1.Pod - servingPodLabels := map[string]string{"app": f.UniqueName} - for _, zone := range zones[:2] { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} - e2epod.SetNodeSelection(&pod.Spec, nodeSelection) - pod.Labels = servingPodLabels - - servingPods = append(servingPods, pod) - zoneForServingPod[pod.Name] = zone + if !found { + framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */)) } - e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) + } - trafficDist := v1.ServiceTrafficDistributionPreferClose - svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "traffic-dist-test-service", - }, - Spec: v1.ServiceSpec{ - Selector: servingPodLabels, - TrafficDistribution: &trafficDist, - Ports: []v1.ServicePort{{ - Port: 80, - TargetPort: intstr.FromInt32(9376), - Protocol: v1.ProtocolTCP, - }}, - }, - }) - ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) + ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) + zoneForServingPod := make(map[string]string) + var servingPods []*v1.Pod + servingPodLabels := map[string]string{"app": f.UniqueName} + for _, zone := range zones[:2] { + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") + nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + pod.Labels = servingPodLabels - ginkgo.By("waiting for EndpointSlices to be created") - err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) - framework.ExpectNoError(err) - slices := endpointSlicesForService(svc.Name) - framework.Logf("got slices:\n%v", format.Object(slices, 1)) + servingPods = append(servingPods, pod) + zoneForServingPod[pod.Name] = zone + } + e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) - ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") + trafficDist := v1.ServiceTrafficDistributionPreferClose + svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "traffic-dist-test-service", + }, + Spec: v1.ServiceSpec{ + Selector: servingPodLabels, + TrafficDistribution: &trafficDist, + Ports: []v1.ServicePort{{ + Port: 80, + TargetPort: intstr.FromInt32(9376), + Protocol: v1.ProtocolTCP, + }}, + }, + }) + ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) - createClientPod := func(ctx context.Context, zone string) *v1.Pod { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} - e2epod.SetNodeSelection(&pod.Spec, nodeSelection) - cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) - pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} - pod.Spec.Containers[0].Name = pod.Name + ginkgo.By("waiting for EndpointSlices to be created") + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + slices := endpointSlicesForService(svc.Name) + framework.Logf("got slices:\n%v", format.Object(slices, 1)) - return e2epod.NewPodClient(f).CreateSync(ctx, pod) - } + ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") - for _, clientZone := range zones[:2] { - framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) + createClientPod := func(ctx context.Context, zone string) *v1.Pod { + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) + nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) + pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + pod.Spec.Containers[0].Name = pod.Name - framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) + return e2epod.NewPodClient(f).CreateSync(ctx, pod) + } - requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { - logLines := reverseChronologicalLogLines - if len(logLines) < 20 { - return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone := 0 - - for _, logLine := range logLines { - if logLine == "" || strings.HasPrefix(logLine, "Date:") { - continue - } - destZone, ok := zoneForServingPod[logLine] - if !ok { - return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - if clientZone != destZone { - return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone++ - if consecutiveSameZone >= 10 { - return nil, nil // Pass condition. - } - } - // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil - }) - - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) - } - - ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") - - clientZone := zones[2] - framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) + for _, clientZone := range zones[:2] { + framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) clientPod := createClientPod(ctx, clientZone) - framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) + framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) - requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { logLines := reverseChronologicalLogLines if len(logLines) < 20 { return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil } - - // Requests are counted as successful when the response read from the log - // lines is the name of a recognizable serving pod. - consecutiveSuccessfulRequests := 0 + consecutiveSameZone := 0 for _, logLine := range logLines { if logLine == "" || strings.HasPrefix(logLine, "Date:") { continue } - _, servingPodExists := zoneForServingPod[logLine] - if !servingPodExists { - return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + destZone, ok := zoneForServingPod[logLine] + if !ok { + return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil } - consecutiveSuccessfulRequests++ - if consecutiveSuccessfulRequests >= 10 { - return nil, nil // Pass condition + if clientZone != destZone { + return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil + } + consecutiveSameZone++ + if consecutiveSameZone >= 10 { + return nil, nil // Pass condition. } } // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil + return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil }) - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) + gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) + } + ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") + + clientZone := zones[2] + framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) + clientPod := createClientPod(ctx, clientZone) + + framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) + + requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + logLines := reverseChronologicalLogLines + if len(logLines) < 20 { + return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil + } + + // Requests are counted as successful when the response read from the log + // lines is the name of a recognizable serving pod. + consecutiveSuccessfulRequests := 0 + + for _, logLine := range logLines { + if logLine == "" || strings.HasPrefix(logLine, "Date:") { + continue + } + _, servingPodExists := zoneForServingPod[logLine] + if !servingPodExists { + return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + } + consecutiveSuccessfulRequests++ + if consecutiveSuccessfulRequests >= 10 { + return nil, nil // Pass condition + } + } + // Ideally, the matcher would never reach this condition + return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil }) + gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) }) }) From bc81a860b082d0303f0613086da56c19194390f6 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 24 Mar 2025 08:46:02 -0400 Subject: [PATCH 4/9] Abstract the logic of the TrafficDistribution test Split the logic of creating the clients and the servers apart from the logic of checking which clients connect to which servers. Add some extra complexity to support additional use cases (like multiple endpoints on the same node). --- test/e2e/network/traffic_distribution.go | 145 ++++++++++++----------- 1 file changed, 74 insertions(+), 71 deletions(-) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index f7b9c9b7cd0..de38d477cf2 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -27,6 +27,7 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" @@ -92,6 +93,18 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { } } + // Data structures for tracking server and client pods + type serverPod struct { + node *v1.Node + pod *v1.Pod + } + + type clientPod struct { + node *v1.Node + endpoints []*serverPod + pod *v1.Pod + } + //////////////////////////////////////////////////////////////////////////// // Main test specifications. //////////////////////////////////////////////////////////////////////////// @@ -109,13 +122,13 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) framework.ExpectNoError(err) - nodeForZone := make(map[string]string) + nodeForZone := make(map[string]*v1.Node) for _, zone := range zones { found := false for _, node := range nodeList.Items { if zone == node.Labels[v1.LabelTopologyZone] { found = true - nodeForZone[zone] = node.GetName() + nodeForZone[zone] = &node } } if !found { @@ -123,20 +136,42 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { } } - ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) - zoneForServingPod := make(map[string]string) - var servingPods []*v1.Pod + var clientPods []*clientPod + var serverPods []*serverPod + + // We want clients in all three zones + for _, node := range nodeForZone { + clientPods = append(clientPods, &clientPod{node: node}) + } + + // and endpoints in the first two zones + serverPods = []*serverPod{ + {node: clientPods[0].node}, + {node: clientPods[1].node}, + } + + // The clients with an endpoint in the same zone should only connect to + // that endpoint. The client with no endpoint in its zone should connect + // to both endpoints. + clientPods[0].endpoints = []*serverPod{serverPods[0]} + clientPods[1].endpoints = []*serverPod{serverPods[1]} + clientPods[2].endpoints = serverPods + + var podsToCreate []*v1.Pod servingPodLabels := map[string]string{"app": f.UniqueName} - for _, zone := range zones[:2] { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + for i, sp := range serverPods { + node := sp.node.Name + zone := sp.node.Labels[v1.LabelTopologyZone] + pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname") + ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone)) + nodeSelection := e2epod.NodeSelection{Name: node} e2epod.SetNodeSelection(&pod.Spec, nodeSelection) pod.Labels = servingPodLabels - servingPods = append(servingPods, pod) - zoneForServingPod[pod.Name] = zone + sp.pod = pod + podsToCreate = append(podsToCreate, pod) } - e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) + e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) trafficDist := v1.ServiceTrafficDistributionPreferClose svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ @@ -156,95 +191,63 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) ginkgo.By("waiting for EndpointSlices to be created") - err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) framework.ExpectNoError(err) slices := endpointSlicesForService(svc.Name) framework.Logf("got slices:\n%v", format.Object(slices, 1)) - ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") - - createClientPod := func(ctx context.Context, zone string) *v1.Pod { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + podsToCreate = nil + for i, cp := range clientPods { + node := cp.node.Name + zone := cp.node.Labels[v1.LabelTopologyZone] + pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("client-%d-%s", i, node), nil, nil, nil) + ginkgo.By(fmt.Sprintf("creating a client pod %q on node %q in zone %q", pod.Name, node, zone)) + nodeSelection := e2epod.NodeSelection{Name: node} e2epod.SetNodeSelection(&pod.Spec, nodeSelection) cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} pod.Spec.Containers[0].Name = pod.Name - return e2epod.NewPodClient(f).CreateSync(ctx, pod) + cp.pod = pod + podsToCreate = append(podsToCreate, pod) } + e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) - for _, clientZone := range zones[:2] { - framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) + for _, cp := range clientPods { + wantedEndpoints := sets.New[string]() + for _, sp := range cp.endpoints { + wantedEndpoints.Insert(sp.pod.Name) + } + unreachedEndpoints := wantedEndpoints.Clone() - framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) + ginkgo.By(fmt.Sprintf("ensuring that requests from %s on %s go to the endpoint(s) %v", cp.pod.Name, cp.node.Name, wantedEndpoints.UnsortedList())) - requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + requestsSucceed := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { logLines := reverseChronologicalLogLines if len(logLines) < 20 { return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil } - consecutiveSameZone := 0 + consecutiveSuccessfulRequests := 0 for _, logLine := range logLines { if logLine == "" || strings.HasPrefix(logLine, "Date:") { continue } - destZone, ok := zoneForServingPod[logLine] - if !ok { - return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + destEndpoint := logLine + if !wantedEndpoints.Has(destEndpoint) { + return gomegaCustomError("request from %s should not have reached %s\nreverseChronologicalLogLines=\n%v", cp.pod.Name, destEndpoint, strings.Join(reverseChronologicalLogLines, "\n")), nil } - if clientZone != destZone { - return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone++ - if consecutiveSameZone >= 10 { + consecutiveSuccessfulRequests++ + unreachedEndpoints.Delete(destEndpoint) + if consecutiveSuccessfulRequests >= 10 && len(unreachedEndpoints) == 0 { return nil, nil // Pass condition. } } // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil + return gomegaCustomError("requests didn't meet the required criteria to reach all endpoints %v\nreverseChronologicalLogLines=\n%v", wantedEndpoints.UnsortedList(), strings.Join(reverseChronologicalLogLines, "\n")), nil }) - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) + gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed) } - - ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") - - clientZone := zones[2] - framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) - - framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) - - requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { - logLines := reverseChronologicalLogLines - if len(logLines) < 20 { - return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil - } - - // Requests are counted as successful when the response read from the log - // lines is the name of a recognizable serving pod. - consecutiveSuccessfulRequests := 0 - - for _, logLine := range logLines { - if logLine == "" || strings.HasPrefix(logLine, "Date:") { - continue - } - _, servingPodExists := zoneForServingPod[logLine] - if !servingPodExists { - return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSuccessfulRequests++ - if consecutiveSuccessfulRequests >= 10 { - return nil, nil // Pass condition - } - } - // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil - }) - - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) }) }) From 10cd54bee4657d3b9f1f6a7a3d770b7ad602ab3b Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 19 Mar 2025 16:52:03 -0400 Subject: [PATCH 5/9] Streamling the node/zone-picking logic --- test/e2e/network/traffic_distribution.go | 28 +++++++++--------------- 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index de38d477cf2..34c08ba0e47 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -110,31 +110,23 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { //////////////////////////////////////////////////////////////////////////// ginkgo.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { - ginkgo.By("finding 3 zones with schedulable nodes") - allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) - framework.ExpectNoError(err) - if len(allZonesSet) < 3 { - framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) - } - zones := allZonesSet.UnsortedList()[:3] - - ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) framework.ExpectNoError(err) nodeForZone := make(map[string]*v1.Node) - for _, zone := range zones { - found := false - for _, node := range nodeList.Items { - if zone == node.Labels[v1.LabelTopologyZone] { - found = true - nodeForZone[zone] = &node - } + for _, node := range nodeList.Items { + zone := node.Labels[v1.LabelTopologyZone] + if nodeForZone[zone] != nil { + continue } - if !found { - framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */)) + nodeForZone[zone] = &node + if len(nodeForZone) == 3 { + break } } + if len(nodeForZone) < 3 { + e2eskipper.Skipf("got %d zones with schedulable nodes, need at least 3", len(nodeForZone)) + } var clientPods []*clientPod var serverPods []*serverPod From 23aff5dabd592b2724964c8218c791cf135683a6 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 24 Mar 2025 08:58:04 -0400 Subject: [PATCH 6/9] Split out the pieces of the TrafficDistribution test --- test/e2e/network/traffic_distribution.go | 76 ++++++++++++++++-------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 34c08ba0e47..6713242ad75 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -105,11 +105,9 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { pod *v1.Pod } - //////////////////////////////////////////////////////////////////////////// - // Main test specifications. - //////////////////////////////////////////////////////////////////////////// - - ginkgo.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { + // allocateClientsAndServers figures out where to put clients and servers for + // a simple "same-zone" traffic distribution test. + allocateClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) { ginkgo.By("finding 3 zones with schedulable nodes") nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) framework.ExpectNoError(err) @@ -149,29 +147,21 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { clientPods[1].endpoints = []*serverPod{serverPods[1]} clientPods[2].endpoints = serverPods - var podsToCreate []*v1.Pod - servingPodLabels := map[string]string{"app": f.UniqueName} - for i, sp := range serverPods { - node := sp.node.Name - zone := sp.node.Labels[v1.LabelTopologyZone] - pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname") - ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone)) - nodeSelection := e2epod.NodeSelection{Name: node} - e2epod.SetNodeSelection(&pod.Spec, nodeSelection) - pod.Labels = servingPodLabels + return clientPods, serverPods + } - sp.pod = pod - podsToCreate = append(podsToCreate, pod) - } - e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) - - trafficDist := v1.ServiceTrafficDistributionPreferClose - svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ + // createService creates the service for a traffic distribution test + createService := func(ctx context.Context, trafficDist string) *v1.Service { + serviceName := "traffic-dist-test-service" + ginkgo.By(fmt.Sprintf("creating a service %q with trafficDistribution %q", serviceName, trafficDist)) + return createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "traffic-dist-test-service", + Name: serviceName, }, Spec: v1.ServiceSpec{ - Selector: servingPodLabels, + Selector: map[string]string{ + "app": f.UniqueName, + }, TrafficDistribution: &trafficDist, Ports: []v1.ServicePort{{ Port: 80, @@ -180,10 +170,29 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { }}, }, }) - ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) + } + + // createPods creates endpoint pods for svc as described by serverPods, waits for + // the EndpointSlices to be updated, and creates clientPods as described by + // clientPods. + createPods := func(ctx context.Context, svc *v1.Service, clientPods []*clientPod, serverPods []*serverPod) { + var podsToCreate []*v1.Pod + for i, sp := range serverPods { + node := sp.node.Name + zone := sp.node.Labels[v1.LabelTopologyZone] + pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname") + ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone)) + nodeSelection := e2epod.NodeSelection{Name: node} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + pod.Labels = svc.Spec.Selector + + sp.pod = pod + podsToCreate = append(podsToCreate, pod) + } + e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) ginkgo.By("waiting for EndpointSlices to be created") - err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) + err := framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) framework.ExpectNoError(err) slices := endpointSlicesForService(svc.Name) framework.Logf("got slices:\n%v", format.Object(slices, 1)) @@ -204,7 +213,11 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { podsToCreate = append(podsToCreate, pod) } e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) + } + // checkTrafficDistribution checks that traffic from clientPods is distributed in + // the expected way. + checkTrafficDistribution := func(ctx context.Context, clientPods []*clientPod) { for _, cp := range clientPods { wantedEndpoints := sets.New[string]() for _, sp := range cp.endpoints { @@ -241,5 +254,16 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed) } + } + + //////////////////////////////////////////////////////////////////////////// + // Main test specifications. + //////////////////////////////////////////////////////////////////////////// + + ginkgo.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { + clientPods, serverPods := allocateClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) }) }) From 7956b37f62dc8213a168bddf38c42003d2e806be Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Thu, 20 Mar 2025 08:48:35 -0400 Subject: [PATCH 7/9] Add test for `trafficDistribution: PreferSameZone` (Identical to the PreferClose test.) --- test/e2e/network/traffic_distribution.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 6713242ad75..e6259c82644 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -260,10 +261,17 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { // Main test specifications. //////////////////////////////////////////////////////////////////////////// - ginkgo.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { + framework.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { clientPods, serverPods := allocateClientsAndServers(ctx) svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose) createPods(ctx, svc, clientPods, serverPods) checkTrafficDistribution(ctx, clientPods) }) + + framework.It("should route traffic to an endpoint in the same zone when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + clientPods, serverPods := allocateClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) }) From 0ee6b0dbfa2f4dc667d3d0c3ee42a4403e9a137a Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 19 Mar 2025 21:23:38 -0400 Subject: [PATCH 8/9] Add "multi-node" TrafficDistribution test The existing TrafficDistribution test didn't really distinguish "same zone" from "same node". Add another test that makes sure there are at least 2 nodes in each zone so it can do that. (Keep the original test as well to avoid losing coverage in CI systems with single-schedulable-node-per-zone clusters.) --- test/e2e/network/traffic_distribution.go | 98 ++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index e6259c82644..3295a968f99 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -94,6 +94,36 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { } } + // getNodesForMultiNode returns a set of nodes for a test case with 3 zones with 2 + // nodes each. If there are not suitable nodes/zones, the test is skipped. + getNodesForMultiNode := func(ctx context.Context) ([]*v1.Node, []*v1.Node, []*v1.Node) { + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + nodesForZone := make(map[string][]*v1.Node) + for _, node := range nodeList.Items { + zone := node.Labels[v1.LabelTopologyZone] + nodesForZone[zone] = append(nodesForZone[zone], &node) + } + if len(nodesForZone) < 3 { + e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each") + } + + var multiNodeZones [][]*v1.Node + for _, nodes := range nodesForZone { + if len(nodes) > 1 { + multiNodeZones = append(multiNodeZones, nodes) + } + if len(multiNodeZones) == 3 { + break + } + } + if len(multiNodeZones) < 3 { + e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each") + } + + return multiNodeZones[0], multiNodeZones[1], multiNodeZones[2] + } + // Data structures for tracking server and client pods type serverPod struct { node *v1.Node @@ -151,6 +181,59 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { return clientPods, serverPods } + // allocateMultiNodeClientsAndServers figures out where to put clients and servers + // for a "same-zone" traffic distribution test with multiple nodes in each zone. + allocateMultiNodeClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) { + ginkgo.By("finding a set of zones and nodes for the test") + zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx) + + var clientPods []*clientPod + var serverPods []*serverPod + + // First zone: a client and an endpoint on each node, and both clients + // should talk to both endpoints. + endpointsForZone := []*serverPod{ + {node: zone1Nodes[0]}, + {node: zone1Nodes[1]}, + } + + clientPods = append(clientPods, + &clientPod{ + node: zone1Nodes[0], + endpoints: endpointsForZone, + }, + &clientPod{ + node: zone1Nodes[1], + endpoints: endpointsForZone, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // Second zone: a client on one node and a server on the other. + endpointsForZone = []*serverPod{ + {node: zone2Nodes[1]}, + } + + clientPods = append(clientPods, + &clientPod{ + node: zone2Nodes[0], + endpoints: endpointsForZone, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // Third zone: just a client, which should connect to the servers in the + // other two zones. + clientPods = append(clientPods, + &clientPod{ + node: zone3Nodes[0], + endpoints: serverPods, + }, + ) + + return clientPods, serverPods + } + // createService creates the service for a traffic distribution test createService := func(ctx context.Context, trafficDist string) *v1.Service { serviceName := "traffic-dist-test-service" @@ -268,10 +351,25 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { checkTrafficDistribution(ctx, clientPods) }) + framework.It("should route traffic correctly between pods on multiple nodes when using PreferClose", func(ctx context.Context) { + clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + framework.It("should route traffic to an endpoint in the same zone when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { clientPods, serverPods := allocateClientsAndServers(ctx) svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone) createPods(ctx, svc, clientPods, serverPods) checkTrafficDistribution(ctx, clientPods) }) + + framework.It("should route traffic correctly between pods on multiple nodes when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + }) From 478a6f9d05326ad06054807133ecca2f33ef5b0a Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 19 Mar 2025 09:11:31 -0400 Subject: [PATCH 9/9] Add a tests for PreferSameNode --- test/e2e/network/traffic_distribution.go | 110 +++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 3295a968f99..79075e7b8b4 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -372,4 +372,114 @@ var _ = common.SIGDescribe("Traffic Distribution", func() { checkTrafficDistribution(ctx, clientPods) }) + framework.It("should route traffic to an endpoint on the same node or fall back to same zone when using PreferSameNode", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + ginkgo.By("finding a set of nodes for the test") + zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx) + + var clientPods []*clientPod + var serverPods []*serverPod + + // The first zone: a client and a server on each node. Each client only + // talks to the server on the same node. + endpointsForZone := []*serverPod{ + {node: zone1Nodes[0]}, + {node: zone1Nodes[1]}, + } + clientPods = append(clientPods, + &clientPod{ + node: zone1Nodes[0], + endpoints: []*serverPod{endpointsForZone[0]}, + }, + &clientPod{ + node: zone1Nodes[1], + endpoints: []*serverPod{endpointsForZone[1]}, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // The second zone: a client on one node and a server on the other. The + // client should fall back to connecting (only) to its same-zone endpoint. + endpointsForZone = []*serverPod{ + {node: zone2Nodes[1]}, + } + clientPods = append(clientPods, + &clientPod{ + node: zone2Nodes[0], + endpoints: endpointsForZone, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // The third zone: just a client. Since it has neither a same-node nor a + // same-zone endpoint, it should connect to all endpoints. + clientPods = append(clientPods, + &clientPod{ + node: zone3Nodes[0], + endpoints: serverPods, + }, + ) + + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + + framework.It("should route traffic to an endpoint on the same node when using PreferSameNode and fall back when the endpoint becomes unavailable", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + ginkgo.By("finding a set of nodes for the test") + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + if len(nodeList.Items) < 2 { + e2eskipper.Skipf("have %d schedulable nodes, need at least 2", len(nodeList.Items)) + } + nodes := nodeList.Items[:2] + + // One client and one server on each node + serverPods := []*serverPod{ + {node: &nodes[0]}, + {node: &nodes[1]}, + } + clientPods := []*clientPod{ + { + node: &nodes[0], + endpoints: []*serverPod{serverPods[0]}, + }, + { + node: &nodes[1], + endpoints: []*serverPod{serverPods[1]}, + }, + } + + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode) + createPods(ctx, svc, clientPods, serverPods) + + ginkgo.By("ensuring that each client talks to its same-node endpoint when both endpoints exist") + checkTrafficDistribution(ctx, clientPods) + + ginkgo.By("killing the server pod on the first node and waiting for the EndpointSlices to be updated") + err = c.CoreV1().Pods(f.Namespace.Name).Delete(ctx, serverPods[0].pod.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 1, 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + + ginkgo.By("ensuring that both clients talk to the remaining endpoint when only one endpoint exists") + serverPods[0].pod = nil + clientPods[0].endpoints = []*serverPod{serverPods[1]} + checkTrafficDistribution(ctx, clientPods) + + ginkgo.By("recreating the missing server pod and waiting for the EndpointSlices to be updated") + // We can't use createPods() here because if we only tell it about + // serverPods[0] and not serverPods[1] it will expect there to be only one + // endpoint. + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "server-0-new", nil, nil, nil, "serve-hostname") + nodeSelection := e2epod.NodeSelection{Name: serverPods[0].node.Name} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + pod.Labels = svc.Spec.Selector + serverPods[0].pod = e2epod.NewPodClient(f).CreateSync(ctx, pod) + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 2, 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + + ginkgo.By("ensuring that each client talks only to its same-node endpoint again") + clientPods[0].endpoints = []*serverPod{serverPods[0]} + checkTrafficDistribution(ctx, clientPods) + }) })