diff --git a/test/e2e/feature/feature.go b/test/e2e/feature/feature.go index 573766c4722..422b5988e21 100644 --- a/test/e2e/feature/feature.go +++ b/test/e2e/feature/feature.go @@ -523,14 +523,6 @@ var ( // - ci-kubernetes-node-e2e-cri-proxy-serial CriProxy = framework.WithFeature(framework.ValidFeatures.Add("CriProxy")) - // Owner: sig-network - // Marks tests that require a cluster with Topology Hints enabled. - TopologyHints = framework.WithFeature(framework.ValidFeatures.Add("Topology Hints")) - - // Owner: sig-network - // Marks tests that require a cluster with Traffic Distribution enabled. - TrafficDistribution = framework.WithFeature(framework.ValidFeatures.Add("Traffic Distribution")) - // TODO: document the feature (owning SIG, when to use this feature for a test) TopologyManager = framework.WithFeature(framework.ValidFeatures.Add("TopologyManager")) diff --git a/test/e2e/network/topology_hints.go b/test/e2e/network/topology_hints.go index 5b553e46af9..c9b63683c5c 100644 --- a/test/e2e/network/topology_hints.go +++ b/test/e2e/network/topology_hints.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -41,7 +40,7 @@ import ( admissionapi "k8s.io/pod-security-admission/api" ) -var _ = common.SIGDescribe(feature.TopologyHints, func() { +var _ = common.SIGDescribe("Topology Hints", func() { f := framework.NewDefaultFramework("topology-hints") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go index 34591075d67..79075e7b8b4 100644 --- a/test/e2e/network/traffic_distribution.go +++ b/test/e2e/network/traffic_distribution.go @@ -27,9 +27,9 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -43,7 +43,7 @@ import ( "github.com/onsi/gomega" ) -var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGate(features.ServiceTrafficDistribution), func() { +var _ = common.SIGDescribe("Traffic Distribution", func() { f := framework.NewDefaultFramework("traffic-distribution") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged @@ -79,31 +79,6 @@ var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGat } } - // endpointSlicesHaveSameZoneHints returns a matcher function to be used with - // gomega.Eventually().Should(...). It checks that the passed EndpointSlices - // have zone-hints which match the endpoint's zone. - endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) { - if len(slices) == 0 { - return nil, fmt.Errorf("no endpointslices found") - } - for _, slice := range slices { - for _, endpoint := range slice.Endpoints { - var ip string - if len(endpoint.Addresses) > 0 { - ip = endpoint.Addresses[0] - } - var zone string - if endpoint.Zone != nil { - zone = *endpoint.Zone - } - if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone { - return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 /* indent one level */)), nil - } - } - } - return nil, nil - }) - // requestsFromClient returns a helper function to be used with // gomega.Eventually(...). It fetches the logs from the clientPod and returns // them in reverse-chronological order. @@ -119,164 +94,392 @@ var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGat } } - //////////////////////////////////////////////////////////////////////////// - // Main test specifications. - //////////////////////////////////////////////////////////////////////////// + // getNodesForMultiNode returns a set of nodes for a test case with 3 zones with 2 + // nodes each. If there are not suitable nodes/zones, the test is skipped. + getNodesForMultiNode := func(ctx context.Context) ([]*v1.Node, []*v1.Node, []*v1.Node) { + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + nodesForZone := make(map[string][]*v1.Node) + for _, node := range nodeList.Items { + zone := node.Labels[v1.LabelTopologyZone] + nodesForZone[zone] = append(nodesForZone[zone], &node) + } + if len(nodesForZone) < 3 { + e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each") + } - ginkgo.When("Service has trafficDistribution=PreferClose", func() { - ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) { - - ginkgo.By("finding 3 zones with schedulable nodes") - allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) - framework.ExpectNoError(err) - if len(allZonesSet) < 3 { - framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) + var multiNodeZones [][]*v1.Node + for _, nodes := range nodesForZone { + if len(nodes) > 1 { + multiNodeZones = append(multiNodeZones, nodes) } - zones := allZonesSet.UnsortedList()[:3] - - ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) - nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) - framework.ExpectNoError(err) - nodeForZone := make(map[string]string) - for _, zone := range zones { - found := false - for _, node := range nodeList.Items { - if zone == node.Labels[v1.LabelTopologyZone] { - found = true - nodeForZone[zone] = node.GetName() - } - } - if !found { - framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */)) - } + if len(multiNodeZones) == 3 { + break } + } + if len(multiNodeZones) < 3 { + e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each") + } - ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) - zoneForServingPod := make(map[string]string) - var servingPods []*v1.Pod - servingPodLabels := map[string]string{"app": f.UniqueName} - for _, zone := range zones[:2] { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} - e2epod.SetNodeSelection(&pod.Spec, nodeSelection) - pod.Labels = servingPodLabels + return multiNodeZones[0], multiNodeZones[1], multiNodeZones[2] + } - servingPods = append(servingPods, pod) - zoneForServingPod[pod.Name] = zone - ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{}) + // Data structures for tracking server and client pods + type serverPod struct { + node *v1.Node + pod *v1.Pod + } + + type clientPod struct { + node *v1.Node + endpoints []*serverPod + pod *v1.Pod + } + + // allocateClientsAndServers figures out where to put clients and servers for + // a simple "same-zone" traffic distribution test. + allocateClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) { + ginkgo.By("finding 3 zones with schedulable nodes") + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + nodeForZone := make(map[string]*v1.Node) + for _, node := range nodeList.Items { + zone := node.Labels[v1.LabelTopologyZone] + if nodeForZone[zone] != nil { + continue } - e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) + nodeForZone[zone] = &node + if len(nodeForZone) == 3 { + break + } + } + if len(nodeForZone) < 3 { + e2eskipper.Skipf("got %d zones with schedulable nodes, need at least 3", len(nodeForZone)) + } - trafficDist := v1.ServiceTrafficDistributionPreferClose - svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "traffic-dist-test-service", + var clientPods []*clientPod + var serverPods []*serverPod + + // We want clients in all three zones + for _, node := range nodeForZone { + clientPods = append(clientPods, &clientPod{node: node}) + } + + // and endpoints in the first two zones + serverPods = []*serverPod{ + {node: clientPods[0].node}, + {node: clientPods[1].node}, + } + + // The clients with an endpoint in the same zone should only connect to + // that endpoint. The client with no endpoint in its zone should connect + // to both endpoints. + clientPods[0].endpoints = []*serverPod{serverPods[0]} + clientPods[1].endpoints = []*serverPod{serverPods[1]} + clientPods[2].endpoints = serverPods + + return clientPods, serverPods + } + + // allocateMultiNodeClientsAndServers figures out where to put clients and servers + // for a "same-zone" traffic distribution test with multiple nodes in each zone. + allocateMultiNodeClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) { + ginkgo.By("finding a set of zones and nodes for the test") + zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx) + + var clientPods []*clientPod + var serverPods []*serverPod + + // First zone: a client and an endpoint on each node, and both clients + // should talk to both endpoints. + endpointsForZone := []*serverPod{ + {node: zone1Nodes[0]}, + {node: zone1Nodes[1]}, + } + + clientPods = append(clientPods, + &clientPod{ + node: zone1Nodes[0], + endpoints: endpointsForZone, + }, + &clientPod{ + node: zone1Nodes[1], + endpoints: endpointsForZone, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // Second zone: a client on one node and a server on the other. + endpointsForZone = []*serverPod{ + {node: zone2Nodes[1]}, + } + + clientPods = append(clientPods, + &clientPod{ + node: zone2Nodes[0], + endpoints: endpointsForZone, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // Third zone: just a client, which should connect to the servers in the + // other two zones. + clientPods = append(clientPods, + &clientPod{ + node: zone3Nodes[0], + endpoints: serverPods, + }, + ) + + return clientPods, serverPods + } + + // createService creates the service for a traffic distribution test + createService := func(ctx context.Context, trafficDist string) *v1.Service { + serviceName := "traffic-dist-test-service" + ginkgo.By(fmt.Sprintf("creating a service %q with trafficDistribution %q", serviceName, trafficDist)) + return createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "app": f.UniqueName, }, - Spec: v1.ServiceSpec{ - Selector: servingPodLabels, - TrafficDistribution: &trafficDist, - Ports: []v1.ServicePort{{ - Port: 80, - TargetPort: intstr.FromInt32(9376), - Protocol: v1.ProtocolTCP, - }}, - }, - }) - ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) - ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{}) + TrafficDistribution: &trafficDist, + Ports: []v1.ServicePort{{ + Port: 80, + TargetPort: intstr.FromInt32(9376), + Protocol: v1.ProtocolTCP, + }}, + }, + }) + } - ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints") - gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints) + // createPods creates endpoint pods for svc as described by serverPods, waits for + // the EndpointSlices to be updated, and creates clientPods as described by + // clientPods. + createPods := func(ctx context.Context, svc *v1.Service, clientPods []*clientPod, serverPods []*serverPod) { + var podsToCreate []*v1.Pod + for i, sp := range serverPods { + node := sp.node.Name + zone := sp.node.Labels[v1.LabelTopologyZone] + pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname") + ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone)) + nodeSelection := e2epod.NodeSelection{Name: node} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + pod.Labels = svc.Spec.Selector - ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") + sp.pod = pod + podsToCreate = append(podsToCreate, pod) + } + e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) - createClientPod := func(ctx context.Context, zone string) *v1.Pod { - pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) - pod.Spec.NodeName = nodeForZone[zone] - nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} - e2epod.SetNodeSelection(&pod.Spec, nodeSelection) - cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) - pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} - pod.Spec.Containers[0].Name = pod.Name + ginkgo.By("waiting for EndpointSlices to be created") + err := framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + slices := endpointSlicesForService(svc.Name) + framework.Logf("got slices:\n%v", format.Object(slices, 1)) - ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{}) - return e2epod.NewPodClient(f).CreateSync(ctx, pod) + podsToCreate = nil + for i, cp := range clientPods { + node := cp.node.Name + zone := cp.node.Labels[v1.LabelTopologyZone] + pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("client-%d-%s", i, node), nil, nil, nil) + ginkgo.By(fmt.Sprintf("creating a client pod %q on node %q in zone %q", pod.Name, node, zone)) + nodeSelection := e2epod.NodeSelection{Name: node} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) + pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + pod.Spec.Containers[0].Name = pod.Name + + cp.pod = pod + podsToCreate = append(podsToCreate, pod) + } + e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate) + } + + // checkTrafficDistribution checks that traffic from clientPods is distributed in + // the expected way. + checkTrafficDistribution := func(ctx context.Context, clientPods []*clientPod) { + for _, cp := range clientPods { + wantedEndpoints := sets.New[string]() + for _, sp := range cp.endpoints { + wantedEndpoints.Insert(sp.pod.Name) } + unreachedEndpoints := wantedEndpoints.Clone() - for _, clientZone := range zones[:2] { - framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) + ginkgo.By(fmt.Sprintf("ensuring that requests from %s on %s go to the endpoint(s) %v", cp.pod.Name, cp.node.Name, wantedEndpoints.UnsortedList())) - framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) - - requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { - logLines := reverseChronologicalLogLines - if len(logLines) < 20 { - return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone := 0 - - for _, logLine := range logLines { - if logLine == "" || strings.HasPrefix(logLine, "Date:") { - continue - } - destZone, ok := zoneForServingPod[logLine] - if !ok { - return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - if clientZone != destZone { - return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil - } - consecutiveSameZone++ - if consecutiveSameZone >= 10 { - return nil, nil // Pass condition. - } - } - // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil - }) - - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) - } - - ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") - - clientZone := zones[2] - framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) - clientPod := createClientPod(ctx, clientZone) - - framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) - - requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + requestsSucceed := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { logLines := reverseChronologicalLogLines if len(logLines) < 20 { return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil } - - // Requests are counted as successful when the response read from the log - // lines is the name of a recognizable serving pod. consecutiveSuccessfulRequests := 0 for _, logLine := range logLines { if logLine == "" || strings.HasPrefix(logLine, "Date:") { continue } - _, servingPodExists := zoneForServingPod[logLine] - if !servingPodExists { - return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + destEndpoint := logLine + if !wantedEndpoints.Has(destEndpoint) { + return gomegaCustomError("request from %s should not have reached %s\nreverseChronologicalLogLines=\n%v", cp.pod.Name, destEndpoint, strings.Join(reverseChronologicalLogLines, "\n")), nil } consecutiveSuccessfulRequests++ - if consecutiveSuccessfulRequests >= 10 { - return nil, nil // Pass condition + unreachedEndpoints.Delete(destEndpoint) + if consecutiveSuccessfulRequests >= 10 && len(unreachedEndpoints) == 0 { + return nil, nil // Pass condition. } } // Ideally, the matcher would never reach this condition - return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil + return gomegaCustomError("requests didn't meet the required criteria to reach all endpoints %v\nreverseChronologicalLogLines=\n%v", wantedEndpoints.UnsortedList(), strings.Join(reverseChronologicalLogLines, "\n")), nil }) - gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) + gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed) + } + } - }) + //////////////////////////////////////////////////////////////////////////// + // Main test specifications. + //////////////////////////////////////////////////////////////////////////// + framework.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) { + clientPods, serverPods := allocateClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + + framework.It("should route traffic correctly between pods on multiple nodes when using PreferClose", func(ctx context.Context) { + clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + + framework.It("should route traffic to an endpoint in the same zone when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + clientPods, serverPods := allocateClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + + framework.It("should route traffic correctly between pods on multiple nodes when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx) + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + + framework.It("should route traffic to an endpoint on the same node or fall back to same zone when using PreferSameNode", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + ginkgo.By("finding a set of nodes for the test") + zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx) + + var clientPods []*clientPod + var serverPods []*serverPod + + // The first zone: a client and a server on each node. Each client only + // talks to the server on the same node. + endpointsForZone := []*serverPod{ + {node: zone1Nodes[0]}, + {node: zone1Nodes[1]}, + } + clientPods = append(clientPods, + &clientPod{ + node: zone1Nodes[0], + endpoints: []*serverPod{endpointsForZone[0]}, + }, + &clientPod{ + node: zone1Nodes[1], + endpoints: []*serverPod{endpointsForZone[1]}, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // The second zone: a client on one node and a server on the other. The + // client should fall back to connecting (only) to its same-zone endpoint. + endpointsForZone = []*serverPod{ + {node: zone2Nodes[1]}, + } + clientPods = append(clientPods, + &clientPod{ + node: zone2Nodes[0], + endpoints: endpointsForZone, + }, + ) + serverPods = append(serverPods, endpointsForZone...) + + // The third zone: just a client. Since it has neither a same-node nor a + // same-zone endpoint, it should connect to all endpoints. + clientPods = append(clientPods, + &clientPod{ + node: zone3Nodes[0], + endpoints: serverPods, + }, + ) + + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode) + createPods(ctx, svc, clientPods, serverPods) + checkTrafficDistribution(ctx, clientPods) + }) + + framework.It("should route traffic to an endpoint on the same node when using PreferSameNode and fall back when the endpoint becomes unavailable", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) { + ginkgo.By("finding a set of nodes for the test") + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + if len(nodeList.Items) < 2 { + e2eskipper.Skipf("have %d schedulable nodes, need at least 2", len(nodeList.Items)) + } + nodes := nodeList.Items[:2] + + // One client and one server on each node + serverPods := []*serverPod{ + {node: &nodes[0]}, + {node: &nodes[1]}, + } + clientPods := []*clientPod{ + { + node: &nodes[0], + endpoints: []*serverPod{serverPods[0]}, + }, + { + node: &nodes[1], + endpoints: []*serverPod{serverPods[1]}, + }, + } + + svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode) + createPods(ctx, svc, clientPods, serverPods) + + ginkgo.By("ensuring that each client talks to its same-node endpoint when both endpoints exist") + checkTrafficDistribution(ctx, clientPods) + + ginkgo.By("killing the server pod on the first node and waiting for the EndpointSlices to be updated") + err = c.CoreV1().Pods(f.Namespace.Name).Delete(ctx, serverPods[0].pod.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 1, 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + + ginkgo.By("ensuring that both clients talk to the remaining endpoint when only one endpoint exists") + serverPods[0].pod = nil + clientPods[0].endpoints = []*serverPod{serverPods[1]} + checkTrafficDistribution(ctx, clientPods) + + ginkgo.By("recreating the missing server pod and waiting for the EndpointSlices to be updated") + // We can't use createPods() here because if we only tell it about + // serverPods[0] and not serverPods[1] it will expect there to be only one + // endpoint. + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "server-0-new", nil, nil, nil, "serve-hostname") + nodeSelection := e2epod.NodeSelection{Name: serverPods[0].node.Name} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + pod.Labels = svc.Spec.Selector + serverPods[0].pod = e2epod.NewPodClient(f).CreateSync(ctx, pod) + err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 2, 1*time.Second, e2eservice.ServiceEndpointsTimeout) + framework.ExpectNoError(err) + + ginkgo.By("ensuring that each client talks only to its same-node endpoint again") + clientPods[0].endpoints = []*serverPod{serverPods[0]} + checkTrafficDistribution(ctx, clientPods) }) })