diff --git a/test/integration/service/service_test.go b/test/integration/service/service_test.go index 296a52daf61..e7ec010a852 100644 --- a/test/integration/service/service_test.go +++ b/test/integration/service/service_test.go @@ -290,6 +290,71 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t } } +func assertEndpointSliceHints(t *testing.T, ctx context.Context, client *clientset.Clientset, namespace, serviceName string, expectZoneHints, expectNodeHints bool) { + t.Helper() + logsBuffer := &bytes.Buffer{} + + endpointSlicesHaveExpectedHints := func(ctx context.Context) (bool, error) { + slices, err := client.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)}) + if err != nil { + fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", serviceName, err) + return false, nil + } + if slices == nil || len(slices.Items) == 0 { + fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", serviceName) + return false, nil + } + fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 /* indent one level */)) + + for _, slice := range slices.Items { + for _, endpoint := range slice.Endpoints { + var ip, zone, nodeName string + if len(endpoint.Addresses) > 0 { + ip = endpoint.Addresses[0] + } + if endpoint.Zone != nil { + zone = *endpoint.Zone + } + if endpoint.NodeName != nil { + nodeName = *endpoint.NodeName + } + if endpoint.Hints == nil { + if expectZoneHints || expectNodeHints { + fmt.Fprintf(logsBuffer, "endpoint with ip %v has no hints, expectZoneHints=%v expectNodeHints=%v\n", ip, expectZoneHints, expectNodeHints) + return false, nil + } + continue + } + if expectZoneHints { + if len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone { + fmt.Fprintf(logsBuffer, "endpoint with ip %v does not have the correct hint, want hint for zone %q\n", ip, zone) + return false, nil + } + } else if len(endpoint.Hints.ForZones) != 0 { + fmt.Fprintf(logsBuffer, "endpoint with ip %v has unexpected hint for zone %q\n", ip, endpoint.Hints.ForZones[0].Name) + return false, nil + } + if expectNodeHints { + if len(endpoint.Hints.ForNodes) != 1 || endpoint.Hints.ForNodes[0].Name != nodeName { + fmt.Fprintf(logsBuffer, "endpoint with ip %v does not have the correct hint, want hint for node %q\n", ip, nodeName) + return false, nil + } + } else if len(endpoint.Hints.ForNodes) != 0 { + fmt.Fprintf(logsBuffer, "endpoint with ip %v has unexpected hint for node %q\n", ip, endpoint.Hints.ForNodes[0].Name) + return false, nil + } + } + } + return true, nil + } + + err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveExpectedHints) + if err != nil { + t.Logf("logsBuffer=\n%v", logsBuffer) + t.Fatalf("Error waiting for EndpointSlices to have expected hints: %v", err) + } +} + // Test transitions involving the `trafficDistribution` field in Service spec. func Test_TransitionsForTrafficDistribution(t *testing.T) { @@ -421,46 +486,10 @@ func Test_TransitionsForTrafficDistribution(t *testing.T) { // hints in EndpointSlice. //////////////////////////////////////////////////////////////////////////// - // logsBuffer captures logs during assertions which multiple retires. These - // will only be printed if the assertion failed. - logsBuffer := &bytes.Buffer{} - - endpointSlicesHaveNoHints := func(ctx context.Context) (bool, error) { - slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())}) - if err != nil { - fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err) - return false, nil - } - if slices == nil || len(slices.Items) == 0 { - fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName()) - return false, nil - } - fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 /* indent one level */)) - - for _, slice := range slices.Items { - for _, endpoint := range slice.Endpoints { - var ip string - if len(endpoint.Addresses) > 0 { - ip = endpoint.Addresses[0] - } - if endpoint.Hints != nil && len(endpoint.Hints.ForZones) != 0 { - fmt.Fprintf(logsBuffer, "endpoint with ip %v has hint %+v, want no hint\n", ip, endpoint.Hints) - return false, nil - } - } - } - return true, nil - } - - err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints) - if err != nil { - t.Logf("logsBuffer=\n%v", logsBuffer) - t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err) - } - logsBuffer.Reset() + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), false, false) //////////////////////////////////////////////////////////////////////////// - // Update the service by setting the `trafficDistribution: PreferLocal` field + // Update the service by setting the `trafficDistribution: PreferClose` field // // Assert that the respective EndpointSlices get the same-zone hints. //////////////////////////////////////////////////////////////////////////// @@ -469,46 +498,10 @@ func Test_TransitionsForTrafficDistribution(t *testing.T) { svc.Spec.TrafficDistribution = &trafficDist _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) if err != nil { - t.Fatalf("Failed to update test service with 'trafficDistribution: PreferLocal': %v", err) + t.Fatalf("Failed to update test service with 'trafficDistribution: PreferClose': %v", err) } - endpointSlicesHaveSameZoneHints := func(ctx context.Context) (bool, error) { - slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())}) - if err != nil { - fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err) - return false, nil - } - if slices == nil || len(slices.Items) == 0 { - fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName()) - return false, nil - } - fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 /* indent one level */)) - - for _, slice := range slices.Items { - for _, endpoint := range slice.Endpoints { - var ip string - if len(endpoint.Addresses) > 0 { - ip = endpoint.Addresses[0] - } - var zone string - if endpoint.Zone != nil { - zone = *endpoint.Zone - } - if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone { - fmt.Fprintf(logsBuffer, "endpoint with ip %v does not have the correct hint, want hint for zone %q\n", ip, zone) - return false, nil - } - } - } - return true, nil - } - - err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints) - if err != nil { - t.Logf("logsBuffer=\n%v", logsBuffer) - t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err) - } - logsBuffer.Reset() + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), true, false) //////////////////////////////////////////////////////////////////////////// // Update the service with the service.kubernetes.io/topology-mode=Auto @@ -518,56 +511,247 @@ func Test_TransitionsForTrafficDistribution(t *testing.T) { // service.kubernetes.io/topology-mode=Auto takes affect, since topology // annotation would not work with only one service pod. //////////////////////////////////////////////////////////////////////////// + svc.Annotations = map[string]string{corev1.AnnotationTopologyMode: "Auto"} _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to update test service with 'service.kubernetes.io/topology-mode=Auto' annotation: %v", err) } - err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints) - if err != nil { - t.Logf("logsBuffer=\n%v", logsBuffer) - t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err) - } - logsBuffer.Reset() + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), false, false) //////////////////////////////////////////////////////////////////////////// // Remove the annotation service.kubernetes.io/topology-mode=Auto from the // service. // // Assert that EndpointSlice for service again has the correct same-zone - // hints because of the `trafficDistribution: PreferLocal` field. + // hints because of the `trafficDistribution: PreferClose` field. //////////////////////////////////////////////////////////////////////////// + svc.Annotations = map[string]string{} _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err) } - err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints) - if err != nil { - t.Logf("logsBuffer=\n%v", logsBuffer) - t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err) - } - logsBuffer.Reset() + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), true, false) //////////////////////////////////////////////////////////////////////////// - // Remove the field `trafficDistribution: PreferLocal` from the service. + // Remove the field `trafficDistribution: PreferClose` from the service. // // Assert that EndpointSlice for service again has no zone hints. //////////////////////////////////////////////////////////////////////////// + svc.Spec.TrafficDistribution = nil _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) if err != nil { t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err) } - err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints) + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), false, false) +} + +// Test transitions involving the `trafficDistribution` field with +// PreferSameTrafficDistribution enabled. +func Test_TransitionsForPreferSameTrafficDistribution(t *testing.T) { + + //////////////////////////////////////////////////////////////////////////// + // Setup components, like kube-apiserver and EndpointSlice controller. + //////////////////////////////////////////////////////////////////////////// + + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTrafficDistribution, true) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferSameTrafficDistribution, true) + + // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + + client, err := clientset.NewForConfig(server.ClientConfig) if err != nil { - t.Logf("logsBuffer=\n%v", logsBuffer) - t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err) + t.Fatalf("Error creating clientset: %v", err) } - logsBuffer.Reset() + + resyncPeriod := 12 * time.Hour + informers := informers.NewSharedInformerFactory(client, resyncPeriod) + + ctx := ktesting.Init(t) + defer ctx.Cancel("test has completed") + epsController := endpointslice.NewController( + ctx, + informers.Core().V1().Pods(), + informers.Core().V1().Services(), + informers.Core().V1().Nodes(), + informers.Discovery().V1().EndpointSlices(), + int32(100), + client, + 1*time.Second, + ) + + informers.Start(ctx.Done()) + go epsController.Run(ctx, 1) + + //////////////////////////////////////////////////////////////////////////// + // Create a namespace, node, pod in the node, and a service exposing the pod. + //////////////////////////////////////////////////////////////////////////// + + ns := framework.CreateNamespaceOrDie(client, "test-service-traffic-distribution", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-node", + Labels: map[string]string{ + corev1.LabelTopologyZone: "fake-zone-1", + }, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: ns.GetName(), + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: corev1.PodSpec{ + NodeName: node.GetName(), + Containers: []corev1.Container{ + { + Name: "fake-name", + Image: "fake-image", + Ports: []corev1.ContainerPort{ + { + Name: "port-443", + ContainerPort: 443, + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + PodIP: "10.0.0.1", + PodIPs: []corev1.PodIP{ + { + IP: "10.0.0.1", + }, + }, + }, + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + Namespace: ns.GetName(), + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "foo": "bar", + }, + Ports: []corev1.ServicePort{ + {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)}, + }, + }, + } + + _, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test node: %v", err) + } + _, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test ready pod: %v", err) + } + _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update status for test pod to Ready: %v", err) + } + _, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create test service: %v", err) + } + + //////////////////////////////////////////////////////////////////////////// + // Assert that without the presence of `trafficDistribution` field there are + // no zone hints in EndpointSlice. + //////////////////////////////////////////////////////////////////////////// + + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), false, false) + + //////////////////////////////////////////////////////////////////////////// + // Update the service by setting `trafficDistribution: PreferSameZone` + // + // Assert that the respective EndpointSlices get the same-zone hints. + //////////////////////////////////////////////////////////////////////////// + + trafficDist := corev1.ServiceTrafficDistributionPreferSameZone + svc.Spec.TrafficDistribution = &trafficDist + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update test service with 'trafficDistribution: PreferSameZone': %v", err) + } + + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), true, false) + + //////////////////////////////////////////////////////////////////////////// + // Change `trafficDistribution` to `PreferSameNode`. + // + // Assert that the respective EndpointSlices have both same-zone and + // same-node hints. + //////////////////////////////////////////////////////////////////////////// + + trafficDist = corev1.ServiceTrafficDistributionPreferSameNode + svc.Spec.TrafficDistribution = &trafficDist + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update test service with 'trafficDistribution: PreferSameNode': %v", err) + } + + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), true, true) + + //////////////////////////////////////////////////////////////////////////// + // Remove the field `trafficDistribution` from the service. + // + // Assert that EndpointSlice for service again has no zone hints. + //////////////////////////////////////////////////////////////////////////// + + svc.Spec.TrafficDistribution = nil + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err) + } + + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), false, false) + + //////////////////////////////////////////////////////////////////////////// + // Update the Node to no longer have a zone label, and re-enable + // `PreferSameNode`. + // + // Assert that the respective EndpointSlices have same-node hints but not + // same-zone. + //////////////////////////////////////////////////////////////////////////// + + delete(node.Labels, corev1.LabelTopologyZone) + _, err = client.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update test node with no zone label: %v", err) + } + + trafficDist = corev1.ServiceTrafficDistributionPreferSameNode + svc.Spec.TrafficDistribution = &trafficDist + _, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update test service with 'trafficDistribution: PreferSameNode': %v", err) + } + + assertEndpointSliceHints(t, ctx, client, ns.GetName(), svc.GetName(), false, true) } func Test_TrafficDistribution_FeatureGateEnableDisable(t *testing.T) { @@ -588,7 +772,7 @@ func Test_TrafficDistribution_FeatureGateEnableDisable(t *testing.T) { } //////////////////////////////////////////////////////////////////////////// - // Create a Service and set `trafficDistribution: PreferLocal` field. + // Create a Service and set `trafficDistribution: PreferClose` field. // // Assert that the field is present in the created Service. ////////////////////////////////////////////////////////////////////////////