From 6680700b5dcefad5b509d36d3390313202c1ea5f Mon Sep 17 00:00:00 2001 From: Gaurav Kumar Ghildiyal Date: Wed, 13 Mar 2024 14:46:21 -0700 Subject: [PATCH] Add e2e and integration tests for Service.spec.trafficDistribution (#123812) * Add e2e tests for Service.spec.trafficDistribution * Fix linting issue * Fix spelling * Add integration tests for trafficDistribution * Use nodeSelection instead of nodeName to schedule pods on a specific zonal node * Fix import alias corev1 -> v1 in e2e test * Address comments * Add a way to only print log lines in case of errors. This is deemed to be good behaviour by e2e tests guidelines --- test/e2e/network/traffic_distribution.go | 280 ++++++++++++++++++++++ test/integration/service/service_test.go | 293 +++++++++++++++++++++++ 2 files changed, 573 insertions(+) create mode 100644 test/e2e/network/traffic_distribution.go diff --git a/test/e2e/network/traffic_distribution.go b/test/e2e/network/traffic_distribution.go new file mode 100644 index 00000000000..59ed3bbffcd --- /dev/null +++ b/test/e2e/network/traffic_distribution.go @@ -0,0 +1,280 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +import ( + "context" + "fmt" + "slices" + "strings" + "time" + + v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2eservice "k8s.io/kubernetes/test/e2e/framework/service" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + "k8s.io/kubernetes/test/e2e/network/common" + "k8s.io/kubernetes/test/utils/format" + admissionapi "k8s.io/pod-security-admission/api" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" +) + +var _ = common.SIGDescribe("TrafficDistribution", func() { + f := framework.NewDefaultFramework("traffic-distribution") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + var c clientset.Interface + + ginkgo.BeforeEach(func(ctx context.Context) { + c = f.ClientSet + e2eskipper.SkipUnlessMultizone(ctx, c) + }) + + //////////////////////////////////////////////////////////////////////////// + // Helper functions + //////////////////////////////////////////////////////////////////////////// + + // endpointSlicesForService returns a helper function to be used with + // gomega.Eventually(...). It fetches the EndpointSlices for the given + // serviceName. + endpointSlicesForService := func(serviceName string) any { + return func(ctx context.Context) ([]discoveryv1.EndpointSlice, error) { + slices, err := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)}) + if err != nil { + return nil, err + } + return slices.Items, nil + } + } + + // gomegaCustomError constructs a function that can be returned from a gomega + // matcher to report an error. + gomegaCustomError := func(format string, a ...any) func() string { + return func() string { + return fmt.Sprintf(format, a...) + } + } + + // endpointSlicesHaveSameZoneHints returns a matcher function to be used with + // gomega.Eventually().Should(...). It checks that the passed EndpointSlices + // have zone-hints which match the endpoint's zone. + endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) { + if len(slices) == 0 { + return nil, fmt.Errorf("no endpointslices found") + } + for _, slice := range slices { + for _, endpoint := range slice.Endpoints { + var ip string + if len(endpoint.Addresses) > 0 { + ip = endpoint.Addresses[0] + } + var zone string + if endpoint.Zone != nil { + zone = *endpoint.Zone + } + if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone { + return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 /* indent one level */)), nil + } + } + } + return nil, nil + }) + + // requestsFromClient returns a helper function to be used with + // gomega.Eventually(...). It fetches the logs from the clientPod and returns + // them in reverse-chronological order. + requestsFromClient := func(clientPod *v1.Pod) any { + return func(ctx context.Context) (reverseChronologicalLogLines []string, err error) { + logs, err := e2epod.GetPodLogs(ctx, c, f.Namespace.Name, clientPod.Name, clientPod.Spec.Containers[0].Name) + if err != nil { + return nil, err + } + logLines := strings.Split(logs, "\n") + slices.Reverse(logLines) + return logLines, nil + } + } + + //////////////////////////////////////////////////////////////////////////// + // Main test specifications. + //////////////////////////////////////////////////////////////////////////// + + ginkgo.When("Service has trafficDistribution=PreferClose", func() { + ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) { + + ginkgo.By("finding 3 zones with schedulable nodes") + allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) + framework.ExpectNoError(err) + if len(allZonesSet) < 3 { + framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) + } + zones := allZonesSet.UnsortedList()[:3] + + ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) + nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) + framework.ExpectNoError(err) + nodeForZone := make(map[string]string) + for _, zone := range zones { + found := false + for _, node := range nodeList.Items { + if zone == node.Labels[v1.LabelTopologyZone] { + found = true + nodeForZone[zone] = node.GetName() + } + } + if !found { + framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */)) + } + } + + ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) + zoneForServingPod := make(map[string]string) + var servingPods []*v1.Pod + servingPodLabels := map[string]string{"app": f.UniqueName} + for _, zone := range zones[:2] { + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname") + nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + pod.Labels = servingPodLabels + + servingPods = append(servingPods, pod) + zoneForServingPod[pod.Name] = zone + ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{}) + } + e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) + + trafficDist := v1.ServiceTrafficDistributionPreferClose + svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "traffic-dist-test-service", + }, + Spec: v1.ServiceSpec{ + Selector: servingPodLabels, + TrafficDistribution: &trafficDist, + Ports: []v1.ServicePort{{ + Port: 80, + TargetPort: intstr.FromInt32(9376), + Protocol: v1.ProtocolTCP, + }}, + }, + }) + ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution)) + ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{}) + + ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints") + gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints) + + ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") + + createClientPod := func(ctx context.Context, zone string) *v1.Pod { + pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) + pod.Spec.NodeName = nodeForZone[zone] + nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) + pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd} + pod.Spec.Containers[0].Name = pod.Name + + ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{}) + return e2epod.NewPodClient(f).CreateSync(ctx, pod) + } + + for _, clientZone := range zones[:2] { + framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) + clientPod := createClientPod(ctx, clientZone) + + framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) + + requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + logLines := reverseChronologicalLogLines + if len(logLines) < 20 { + return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil + } + consecutiveSameZone := 0 + + for _, logLine := range logLines { + if logLine == "" || strings.HasPrefix(logLine, "Date:") { + continue + } + destZone, ok := zoneForServingPod[logLine] + if !ok { + return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + } + if clientZone != destZone { + return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil + } + consecutiveSameZone++ + if consecutiveSameZone >= 10 { + return nil, nil // Pass condition. + } + } + // Ideally, the matcher would never reach this condition + return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil + }) + + gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone) + } + + ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client") + + clientZone := zones[2] + framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone) + clientPod := createClientPod(ctx, clientZone) + + framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) + + requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { + logLines := reverseChronologicalLogLines + if len(logLines) < 20 { + return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil + } + + // Requests are counted as successful when the response read from the log + // lines is the name of a recognizable serving pod. + consecutiveSuccessfulRequests := 0 + + for _, logLine := range logLines { + if logLine == "" || strings.HasPrefix(logLine, "Date:") { + continue + } + _, servingPodExists := zoneForServingPod[logLine] + if !servingPodExists { + return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil + } + consecutiveSuccessfulRequests++ + if consecutiveSuccessfulRequests >= 10 { + return nil, nil // Pass condition + } + } + // Ideally, the matcher would never reach this condition + return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil + }) + + gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) + + }) + + }) +}) diff --git a/test/integration/service/service_test.go b/test/integration/service/service_test.go index 5d675609e2a..7f67233449b 100644 --- a/test/integration/service/service_test.go +++ b/test/integration/service/service_test.go @@ -17,14 +17,27 @@ limitations under the License. package service import ( + "bytes" "context" + "fmt" "testing" + "time" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + featuregatetesting "k8s.io/component-base/featuregate/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/endpointslice" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils/format" + "k8s.io/kubernetes/test/utils/ktesting" ) // Test_ExternalNameServiceStopsDefaultingInternalTrafficPolicy tests that Services no longer default @@ -264,3 +277,283 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs") } } + +// Test transitions involving the `trafficDistribution` field in Service spec. +func Test_TransitionsForTrafficDistribution(t *testing.T) { + + //////////////////////////////////////////////////////////////////////////// + // Setup components, like kube-apiserver and EndpointSlice controller. + //////////////////////////////////////////////////////////////////////////// + + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTrafficDistribution, true)() + + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + resyncPeriod := 12 * time.Hour + informers := informers.NewSharedInformerFactory(client, resyncPeriod) + + ctx := ktesting.Init(t) + defer ctx.Cancel("test has completed") + epsController := endpointslice.NewController( + ctx, + informers.Core().V1().Pods(), + informers.Core().V1().Services(), + informers.Core().V1().Nodes(), + informers.Discovery().V1().EndpointSlices(), + int32(100), + client, + 1*time.Second, + ) + + informers.Start(ctx.Done()) + go epsController.Run(ctx, 1) + + //////////////////////////////////////////////////////////////////////////// + // Create a namespace, node, pod in the node, and a service exposing the pod. + //////////////////////////////////////////////////////////////////////////// + + ns := framework.CreateNamespaceOrDie(client, "test-service-traffic-distribution", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-node", + Labels: map[string]string{ + corev1.LabelTopologyZone: "fake-zone-1", + }, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: ns.GetName(), + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: corev1.PodSpec{ + NodeName: node.GetName(), + Containers: []corev1.Container{ + { + Name: "fake-name", + Image: "fake-image", + Ports: []corev1.ContainerPort{ + { + Name: "port-443", + ContainerPort: 443, + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "10.0.0.1", + PodIPs: []corev1.PodIP{ + { + IP: "10.0.0.1", + }, + }, + }, + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: ns.GetName(), + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Ports: []corev1.ServicePort{ + {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)}, + }, + }, + } + + _, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test node: %v", err) + } + _, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test ready pod: %v", err) + } + _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status for test pod to Ready: %v", err) + } + _, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test service: %v", err) + } + + //////////////////////////////////////////////////////////////////////////// + // Assert that without the presence of `trafficDistribution` field and the + // service.kubernetes.io/topology-mode=Auto annotation, there are no zone + // hints in EndpointSlice. + //////////////////////////////////////////////////////////////////////////// + + // logsBuffer captures logs during assertions which multiple retires. These + // will only be printed if the assertion failed. + logsBuffer := &bytes.Buffer{} + + endpointSlicesHaveNoHints := func(ctx context.Context) (bool, error) { + slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())}) + if err != nil { + fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err) + return false, nil + } + if slices == nil || len(slices.Items) == 0 { + fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName()) + return false, nil + } + fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 /* indent one level */)) + + for _, slice := range slices.Items { + for _, endpoint := range slice.Endpoints { + var ip string + if len(endpoint.Addresses) > 0 { + ip = endpoint.Addresses[0] + } + if endpoint.Hints != nil && len(endpoint.Hints.ForZones) != 0 { + fmt.Fprintf(logsBuffer, "endpoint with ip %v has hint %+v, want no hint\n", ip, endpoint.Hints) + return false, nil + } + } + } + return true, nil + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints) + if err != nil { + t.Logf("logsBuffer=\n%v", logsBuffer) + t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err) + } + logsBuffer.Reset() + + //////////////////////////////////////////////////////////////////////////// + // Update the service by setting the `trafficDistribution: PreferLocal` field + // + // Assert that the respective EndpointSlices get the same-zone hints. + //////////////////////////////////////////////////////////////////////////// + + trafficDist := corev1.ServiceTrafficDistributionPreferClose + svc.Spec.TrafficDistribution = &trafficDist + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update test service with 'trafficDistribution: PreferLocal': %v", err) + } + + endpointSlicesHaveSameZoneHints := func(ctx context.Context) (bool, error) { + slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())}) + if err != nil { + fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err) + return false, nil + } + if slices == nil || len(slices.Items) == 0 { + fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName()) + return false, nil + } + fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 /* indent one level */)) + + for _, slice := range slices.Items { + for _, endpoint := range slice.Endpoints { + var ip string + if len(endpoint.Addresses) > 0 { + ip = endpoint.Addresses[0] + } + var zone string + if endpoint.Zone != nil { + zone = *endpoint.Zone + } + if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone { + fmt.Fprintf(logsBuffer, "endpoint with ip %v does not have the correct hint, want hint for zone %q\n", ip, zone) + return false, nil + } + } + } + return true, nil + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints) + if err != nil { + t.Logf("logsBuffer=\n%v", logsBuffer) + t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err) + } + logsBuffer.Reset() + + //////////////////////////////////////////////////////////////////////////// + // Update the service with the service.kubernetes.io/topology-mode=Auto + // annotation. + // + // Assert that the EndpointSlice for service have no hints once + // service.kubernetes.io/topology-mode=Auto takes affect, since topology + // annotation would not work with only one service pod. + //////////////////////////////////////////////////////////////////////////// + svc.Annotations = map[string]string{corev1.AnnotationTopologyMode: "Auto"} + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update test service with 'service.kubernetes.io/topology-mode=Auto' annotation: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints) + if err != nil { + t.Logf("logsBuffer=\n%v", logsBuffer) + t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err) + } + logsBuffer.Reset() + + //////////////////////////////////////////////////////////////////////////// + // Remove the annotation service.kubernetes.io/topology-mode=Auto from the + // service. + // + // Assert that EndpointSlice for service again has the correct same-zone + // hints because of the `trafficDistribution: PreferLocal` field. + //////////////////////////////////////////////////////////////////////////// + svc.Annotations = map[string]string{} + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints) + if err != nil { + t.Logf("logsBuffer=\n%v", logsBuffer) + t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err) + } + logsBuffer.Reset() + + //////////////////////////////////////////////////////////////////////////// + // Remove the field `trafficDistribution: PreferLocal` from the service. + // + // Assert that EndpointSlice for service again has no zone hints. + //////////////////////////////////////////////////////////////////////////// + svc.Spec.TrafficDistribution = nil + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints) + if err != nil { + t.Logf("logsBuffer=\n%v", logsBuffer) + t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err) + } + logsBuffer.Reset() +}