mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-27 12:31:23 +00:00
Add e2e and integration tests for Service.spec.trafficDistribution (#123812)
* Add e2e tests for Service.spec.trafficDistribution * Fix linting issue * Fix spelling * Add integration tests for trafficDistribution * Use nodeSelection instead of nodeName to schedule pods on a specific zonal node * Fix import alias corev1 -> v1 in e2e test * Address comments * Add a way to only print log lines in case of errors. This is deemed to be good behaviour by e2e tests guidelines
This commit is contained in:
parent
3a75a8c8d9
commit
6680700b5d
280
test/e2e/network/traffic_distribution.go
Normal file
280
test/e2e/network/traffic_distribution.go
Normal file
@ -0,0 +1,280 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package network
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
|
||||
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
|
||||
"k8s.io/kubernetes/test/e2e/network/common"
|
||||
"k8s.io/kubernetes/test/utils/format"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = common.SIGDescribe("TrafficDistribution", func() {
|
||||
f := framework.NewDefaultFramework("traffic-distribution")
|
||||
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
|
||||
|
||||
var c clientset.Interface
|
||||
|
||||
ginkgo.BeforeEach(func(ctx context.Context) {
|
||||
c = f.ClientSet
|
||||
e2eskipper.SkipUnlessMultizone(ctx, c)
|
||||
})
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Helper functions
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// endpointSlicesForService returns a helper function to be used with
|
||||
// gomega.Eventually(...). It fetches the EndpointSlices for the given
|
||||
// serviceName.
|
||||
endpointSlicesForService := func(serviceName string) any {
|
||||
return func(ctx context.Context) ([]discoveryv1.EndpointSlice, error) {
|
||||
slices, err := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return slices.Items, nil
|
||||
}
|
||||
}
|
||||
|
||||
// gomegaCustomError constructs a function that can be returned from a gomega
|
||||
// matcher to report an error.
|
||||
gomegaCustomError := func(format string, a ...any) func() string {
|
||||
return func() string {
|
||||
return fmt.Sprintf(format, a...)
|
||||
}
|
||||
}
|
||||
|
||||
// endpointSlicesHaveSameZoneHints returns a matcher function to be used with
|
||||
// gomega.Eventually().Should(...). It checks that the passed EndpointSlices
|
||||
// have zone-hints which match the endpoint's zone.
|
||||
endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) {
|
||||
if len(slices) == 0 {
|
||||
return nil, fmt.Errorf("no endpointslices found")
|
||||
}
|
||||
for _, slice := range slices {
|
||||
for _, endpoint := range slice.Endpoints {
|
||||
var ip string
|
||||
if len(endpoint.Addresses) > 0 {
|
||||
ip = endpoint.Addresses[0]
|
||||
}
|
||||
var zone string
|
||||
if endpoint.Zone != nil {
|
||||
zone = *endpoint.Zone
|
||||
}
|
||||
if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
|
||||
return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 /* indent one level */)), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
// requestsFromClient returns a helper function to be used with
|
||||
// gomega.Eventually(...). It fetches the logs from the clientPod and returns
|
||||
// them in reverse-chronological order.
|
||||
requestsFromClient := func(clientPod *v1.Pod) any {
|
||||
return func(ctx context.Context) (reverseChronologicalLogLines []string, err error) {
|
||||
logs, err := e2epod.GetPodLogs(ctx, c, f.Namespace.Name, clientPod.Name, clientPod.Spec.Containers[0].Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logLines := strings.Split(logs, "\n")
|
||||
slices.Reverse(logLines)
|
||||
return logLines, nil
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Main test specifications.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ginkgo.When("Service has trafficDistribution=PreferClose", func() {
|
||||
ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) {
|
||||
|
||||
ginkgo.By("finding 3 zones with schedulable nodes")
|
||||
allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c)
|
||||
framework.ExpectNoError(err)
|
||||
if len(allZonesSet) < 3 {
|
||||
framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet))
|
||||
}
|
||||
zones := allZonesSet.UnsortedList()[:3]
|
||||
|
||||
ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
|
||||
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
|
||||
framework.ExpectNoError(err)
|
||||
nodeForZone := make(map[string]string)
|
||||
for _, zone := range zones {
|
||||
found := false
|
||||
for _, node := range nodeList.Items {
|
||||
if zone == node.Labels[v1.LabelTopologyZone] {
|
||||
found = true
|
||||
nodeForZone[zone] = node.GetName()
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */))
|
||||
}
|
||||
}
|
||||
|
||||
ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
|
||||
zoneForServingPod := make(map[string]string)
|
||||
var servingPods []*v1.Pod
|
||||
servingPodLabels := map[string]string{"app": f.UniqueName}
|
||||
for _, zone := range zones[:2] {
|
||||
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
|
||||
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
|
||||
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||
pod.Labels = servingPodLabels
|
||||
|
||||
servingPods = append(servingPods, pod)
|
||||
zoneForServingPod[pod.Name] = zone
|
||||
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
|
||||
}
|
||||
e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
|
||||
|
||||
trafficDist := v1.ServiceTrafficDistributionPreferClose
|
||||
svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "traffic-dist-test-service",
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: servingPodLabels,
|
||||
TrafficDistribution: &trafficDist,
|
||||
Ports: []v1.ServicePort{{
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt32(9376),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}},
|
||||
},
|
||||
})
|
||||
ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
|
||||
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{})
|
||||
|
||||
ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints")
|
||||
gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints)
|
||||
|
||||
ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
|
||||
|
||||
createClientPod := func(ctx context.Context, zone string) *v1.Pod {
|
||||
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
|
||||
pod.Spec.NodeName = nodeForZone[zone]
|
||||
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
|
||||
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
|
||||
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
|
||||
pod.Spec.Containers[0].Name = pod.Name
|
||||
|
||||
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
|
||||
return e2epod.NewPodClient(f).CreateSync(ctx, pod)
|
||||
}
|
||||
|
||||
for _, clientZone := range zones[:2] {
|
||||
framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
|
||||
clientPod := createClientPod(ctx, clientZone)
|
||||
|
||||
framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
|
||||
|
||||
requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
|
||||
logLines := reverseChronologicalLogLines
|
||||
if len(logLines) < 20 {
|
||||
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
consecutiveSameZone := 0
|
||||
|
||||
for _, logLine := range logLines {
|
||||
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
|
||||
continue
|
||||
}
|
||||
destZone, ok := zoneForServingPod[logLine]
|
||||
if !ok {
|
||||
return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
if clientZone != destZone {
|
||||
return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
consecutiveSameZone++
|
||||
if consecutiveSameZone >= 10 {
|
||||
return nil, nil // Pass condition.
|
||||
}
|
||||
}
|
||||
// Ideally, the matcher would never reach this condition
|
||||
return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
})
|
||||
|
||||
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
|
||||
}
|
||||
|
||||
ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
|
||||
|
||||
clientZone := zones[2]
|
||||
framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
|
||||
clientPod := createClientPod(ctx, clientZone)
|
||||
|
||||
framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
|
||||
|
||||
requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
|
||||
logLines := reverseChronologicalLogLines
|
||||
if len(logLines) < 20 {
|
||||
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
|
||||
// Requests are counted as successful when the response read from the log
|
||||
// lines is the name of a recognizable serving pod.
|
||||
consecutiveSuccessfulRequests := 0
|
||||
|
||||
for _, logLine := range logLines {
|
||||
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
|
||||
continue
|
||||
}
|
||||
_, servingPodExists := zoneForServingPod[logLine]
|
||||
if !servingPodExists {
|
||||
return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
consecutiveSuccessfulRequests++
|
||||
if consecutiveSuccessfulRequests >= 10 {
|
||||
return nil, nil // Pass condition
|
||||
}
|
||||
}
|
||||
// Ideally, the matcher would never reach this condition
|
||||
return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
})
|
||||
|
||||
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
})
|
@ -17,14 +17,27 @@ limitations under the License.
|
||||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
"k8s.io/kubernetes/pkg/controller/endpointslice"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/utils/format"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
// Test_ExternalNameServiceStopsDefaultingInternalTrafficPolicy tests that Services no longer default
|
||||
@ -264,3 +277,283 @@ func Test_RemovingExternalIPsFromClusterIPServiceDropsExternalTrafficPolicy(t *t
|
||||
t.Error("service externalTrafficPolicy was not set for clusterIP Service with externalIPs")
|
||||
}
|
||||
}
|
||||
|
||||
// Test transitions involving the `trafficDistribution` field in Service spec.
|
||||
func Test_TransitionsForTrafficDistribution(t *testing.T) {
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Setup components, like kube-apiserver and EndpointSlice controller.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTrafficDistribution, true)()
|
||||
|
||||
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
|
||||
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
|
||||
defer server.TearDownFn()
|
||||
|
||||
client, err := clientset.NewForConfig(server.ClientConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating clientset: %v", err)
|
||||
}
|
||||
|
||||
resyncPeriod := 12 * time.Hour
|
||||
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
|
||||
|
||||
ctx := ktesting.Init(t)
|
||||
defer ctx.Cancel("test has completed")
|
||||
epsController := endpointslice.NewController(
|
||||
ctx,
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().Services(),
|
||||
informers.Core().V1().Nodes(),
|
||||
informers.Discovery().V1().EndpointSlices(),
|
||||
int32(100),
|
||||
client,
|
||||
1*time.Second,
|
||||
)
|
||||
|
||||
informers.Start(ctx.Done())
|
||||
go epsController.Run(ctx, 1)
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Create a namespace, node, pod in the node, and a service exposing the pod.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ns := framework.CreateNamespaceOrDie(client, "test-service-traffic-distribution", t)
|
||||
defer framework.DeleteNamespaceOrDie(client, ns, t)
|
||||
|
||||
node := &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "fake-node",
|
||||
Labels: map[string]string{
|
||||
corev1.LabelTopologyZone: "fake-zone-1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pod := &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-pod",
|
||||
Namespace: ns.GetName(),
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
NodeName: node.GetName(),
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: "fake-name",
|
||||
Image: "fake-image",
|
||||
Ports: []corev1.ContainerPort{
|
||||
{
|
||||
Name: "port-443",
|
||||
ContainerPort: 443,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: corev1.PodStatus{
|
||||
Phase: corev1.PodRunning,
|
||||
Conditions: []corev1.PodCondition{
|
||||
{
|
||||
Type: corev1.PodReady,
|
||||
Status: corev1.ConditionTrue,
|
||||
},
|
||||
},
|
||||
PodIP: "10.0.0.1",
|
||||
PodIPs: []corev1.PodIP{
|
||||
{
|
||||
IP: "10.0.0.1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
svc := &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-service",
|
||||
Namespace: ns.GetName(),
|
||||
},
|
||||
Spec: corev1.ServiceSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
Ports: []corev1.ServicePort{
|
||||
{Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test node: %v", err)
|
||||
}
|
||||
_, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test ready pod: %v", err)
|
||||
}
|
||||
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, pod, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update status for test pod to Ready: %v", err)
|
||||
}
|
||||
_, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create test service: %v", err)
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Assert that without the presence of `trafficDistribution` field and the
|
||||
// service.kubernetes.io/topology-mode=Auto annotation, there are no zone
|
||||
// hints in EndpointSlice.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// logsBuffer captures logs during assertions which multiple retires. These
|
||||
// will only be printed if the assertion failed.
|
||||
logsBuffer := &bytes.Buffer{}
|
||||
|
||||
endpointSlicesHaveNoHints := func(ctx context.Context) (bool, error) {
|
||||
slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())})
|
||||
if err != nil {
|
||||
fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err)
|
||||
return false, nil
|
||||
}
|
||||
if slices == nil || len(slices.Items) == 0 {
|
||||
fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName())
|
||||
return false, nil
|
||||
}
|
||||
fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 /* indent one level */))
|
||||
|
||||
for _, slice := range slices.Items {
|
||||
for _, endpoint := range slice.Endpoints {
|
||||
var ip string
|
||||
if len(endpoint.Addresses) > 0 {
|
||||
ip = endpoint.Addresses[0]
|
||||
}
|
||||
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) != 0 {
|
||||
fmt.Fprintf(logsBuffer, "endpoint with ip %v has hint %+v, want no hint\n", ip, endpoint.Hints)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints)
|
||||
if err != nil {
|
||||
t.Logf("logsBuffer=\n%v", logsBuffer)
|
||||
t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err)
|
||||
}
|
||||
logsBuffer.Reset()
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Update the service by setting the `trafficDistribution: PreferLocal` field
|
||||
//
|
||||
// Assert that the respective EndpointSlices get the same-zone hints.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
trafficDist := corev1.ServiceTrafficDistributionPreferClose
|
||||
svc.Spec.TrafficDistribution = &trafficDist
|
||||
_, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update test service with 'trafficDistribution: PreferLocal': %v", err)
|
||||
}
|
||||
|
||||
endpointSlicesHaveSameZoneHints := func(ctx context.Context) (bool, error) {
|
||||
slices, err := client.DiscoveryV1().EndpointSlices(ns.GetName()).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.GetName())})
|
||||
if err != nil {
|
||||
fmt.Fprintf(logsBuffer, "failed to list EndpointSlices for service %q: %v\n", svc.GetName(), err)
|
||||
return false, nil
|
||||
}
|
||||
if slices == nil || len(slices.Items) == 0 {
|
||||
fmt.Fprintf(logsBuffer, "no EndpointSlices returned for service %q\n", svc.GetName())
|
||||
return false, nil
|
||||
}
|
||||
fmt.Fprintf(logsBuffer, "EndpointSlices=\n%v\n", format.Object(slices, 1 /* indent one level */))
|
||||
|
||||
for _, slice := range slices.Items {
|
||||
for _, endpoint := range slice.Endpoints {
|
||||
var ip string
|
||||
if len(endpoint.Addresses) > 0 {
|
||||
ip = endpoint.Addresses[0]
|
||||
}
|
||||
var zone string
|
||||
if endpoint.Zone != nil {
|
||||
zone = *endpoint.Zone
|
||||
}
|
||||
if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
|
||||
fmt.Fprintf(logsBuffer, "endpoint with ip %v does not have the correct hint, want hint for zone %q\n", ip, zone)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints)
|
||||
if err != nil {
|
||||
t.Logf("logsBuffer=\n%v", logsBuffer)
|
||||
t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err)
|
||||
}
|
||||
logsBuffer.Reset()
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Update the service with the service.kubernetes.io/topology-mode=Auto
|
||||
// annotation.
|
||||
//
|
||||
// Assert that the EndpointSlice for service have no hints once
|
||||
// service.kubernetes.io/topology-mode=Auto takes affect, since topology
|
||||
// annotation would not work with only one service pod.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
svc.Annotations = map[string]string{corev1.AnnotationTopologyMode: "Auto"}
|
||||
_, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update test service with 'service.kubernetes.io/topology-mode=Auto' annotation: %v", err)
|
||||
}
|
||||
|
||||
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints)
|
||||
if err != nil {
|
||||
t.Logf("logsBuffer=\n%v", logsBuffer)
|
||||
t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err)
|
||||
}
|
||||
logsBuffer.Reset()
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Remove the annotation service.kubernetes.io/topology-mode=Auto from the
|
||||
// service.
|
||||
//
|
||||
// Assert that EndpointSlice for service again has the correct same-zone
|
||||
// hints because of the `trafficDistribution: PreferLocal` field.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
svc.Annotations = map[string]string{}
|
||||
_, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err)
|
||||
}
|
||||
|
||||
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveSameZoneHints)
|
||||
if err != nil {
|
||||
t.Logf("logsBuffer=\n%v", logsBuffer)
|
||||
t.Fatalf("Error waiting for EndpointSlices to have same zone hints: %v", err)
|
||||
}
|
||||
logsBuffer.Reset()
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Remove the field `trafficDistribution: PreferLocal` from the service.
|
||||
//
|
||||
// Assert that EndpointSlice for service again has no zone hints.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
svc.Spec.TrafficDistribution = nil
|
||||
_, err = client.CoreV1().Services(ns.Name).Update(ctx, svc, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to remove annotation 'service.kubernetes.io/topology-mode=Auto' from service: %v", err)
|
||||
}
|
||||
|
||||
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, endpointSlicesHaveNoHints)
|
||||
if err != nil {
|
||||
t.Logf("logsBuffer=\n%v", logsBuffer)
|
||||
t.Fatalf("Error waiting for EndpointSlices to have no hints: %v", err)
|
||||
}
|
||||
logsBuffer.Reset()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user