Abstract the logic of the TrafficDistribution test

Split the logic of creating the clients and the servers apart from the
logic of checking which clients connect to which servers. Add some
extra complexity to support additional use cases (like multiple
endpoints on the same node).
This commit is contained in:
Dan Winship 2025-03-24 08:46:02 -04:00
parent b1a0fea4c6
commit bc81a860b0

View File

@ -27,6 +27,7 @@ import (
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
@ -92,6 +93,18 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
}
}
// Data structures for tracking server and client pods
type serverPod struct {
node *v1.Node
pod *v1.Pod
}
type clientPod struct {
node *v1.Node
endpoints []*serverPod
pod *v1.Pod
}
////////////////////////////////////////////////////////////////////////////
// Main test specifications.
////////////////////////////////////////////////////////////////////////////
@ -109,13 +122,13 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
framework.ExpectNoError(err)
nodeForZone := make(map[string]string)
nodeForZone := make(map[string]*v1.Node)
for _, zone := range zones {
found := false
for _, node := range nodeList.Items {
if zone == node.Labels[v1.LabelTopologyZone] {
found = true
nodeForZone[zone] = node.GetName()
nodeForZone[zone] = &node
}
}
if !found {
@ -123,20 +136,42 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
}
}
ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
zoneForServingPod := make(map[string]string)
var servingPods []*v1.Pod
var clientPods []*clientPod
var serverPods []*serverPod
// We want clients in all three zones
for _, node := range nodeForZone {
clientPods = append(clientPods, &clientPod{node: node})
}
// and endpoints in the first two zones
serverPods = []*serverPod{
{node: clientPods[0].node},
{node: clientPods[1].node},
}
// The clients with an endpoint in the same zone should only connect to
// that endpoint. The client with no endpoint in its zone should connect
// to both endpoints.
clientPods[0].endpoints = []*serverPod{serverPods[0]}
clientPods[1].endpoints = []*serverPod{serverPods[1]}
clientPods[2].endpoints = serverPods
var podsToCreate []*v1.Pod
servingPodLabels := map[string]string{"app": f.UniqueName}
for _, zone := range zones[:2] {
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
for i, sp := range serverPods {
node := sp.node.Name
zone := sp.node.Labels[v1.LabelTopologyZone]
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname")
ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone))
nodeSelection := e2epod.NodeSelection{Name: node}
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
pod.Labels = servingPodLabels
servingPods = append(servingPods, pod)
zoneForServingPod[pod.Name] = zone
sp.pod = pod
podsToCreate = append(podsToCreate, pod)
}
e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
trafficDist := v1.ServiceTrafficDistributionPreferClose
svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
@ -156,95 +191,63 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
ginkgo.By("waiting for EndpointSlices to be created")
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout)
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout)
framework.ExpectNoError(err)
slices := endpointSlicesForService(svc.Name)
framework.Logf("got slices:\n%v", format.Object(slices, 1))
ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
createClientPod := func(ctx context.Context, zone string) *v1.Pod {
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
podsToCreate = nil
for i, cp := range clientPods {
node := cp.node.Name
zone := cp.node.Labels[v1.LabelTopologyZone]
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("client-%d-%s", i, node), nil, nil, nil)
ginkgo.By(fmt.Sprintf("creating a client pod %q on node %q in zone %q", pod.Name, node, zone))
nodeSelection := e2epod.NodeSelection{Name: node}
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
pod.Spec.Containers[0].Name = pod.Name
return e2epod.NewPodClient(f).CreateSync(ctx, pod)
cp.pod = pod
podsToCreate = append(podsToCreate, pod)
}
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
for _, clientZone := range zones[:2] {
framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
clientPod := createClientPod(ctx, clientZone)
for _, cp := range clientPods {
wantedEndpoints := sets.New[string]()
for _, sp := range cp.endpoints {
wantedEndpoints.Insert(sp.pod.Name)
}
unreachedEndpoints := wantedEndpoints.Clone()
framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
ginkgo.By(fmt.Sprintf("ensuring that requests from %s on %s go to the endpoint(s) %v", cp.pod.Name, cp.node.Name, wantedEndpoints.UnsortedList()))
requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
requestsSucceed := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
logLines := reverseChronologicalLogLines
if len(logLines) < 20 {
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
}
consecutiveSameZone := 0
for _, logLine := range logLines {
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
continue
}
destZone, ok := zoneForServingPod[logLine]
if !ok {
return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
}
if clientZone != destZone {
return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
}
consecutiveSameZone++
if consecutiveSameZone >= 10 {
return nil, nil // Pass condition.
}
}
// Ideally, the matcher would never reach this condition
return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
})
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
}
ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
clientZone := zones[2]
framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
clientPod := createClientPod(ctx, clientZone)
framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
logLines := reverseChronologicalLogLines
if len(logLines) < 20 {
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
}
// Requests are counted as successful when the response read from the log
// lines is the name of a recognizable serving pod.
consecutiveSuccessfulRequests := 0
for _, logLine := range logLines {
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
continue
}
_, servingPodExists := zoneForServingPod[logLine]
if !servingPodExists {
return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
destEndpoint := logLine
if !wantedEndpoints.Has(destEndpoint) {
return gomegaCustomError("request from %s should not have reached %s\nreverseChronologicalLogLines=\n%v", cp.pod.Name, destEndpoint, strings.Join(reverseChronologicalLogLines, "\n")), nil
}
consecutiveSuccessfulRequests++
if consecutiveSuccessfulRequests >= 10 {
return nil, nil // Pass condition
unreachedEndpoints.Delete(destEndpoint)
if consecutiveSuccessfulRequests >= 10 && len(unreachedEndpoints) == 0 {
return nil, nil // Pass condition.
}
}
// Ideally, the matcher would never reach this condition
return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
return gomegaCustomError("requests didn't meet the required criteria to reach all endpoints %v\nreverseChronologicalLogLines=\n%v", wantedEndpoints.UnsortedList(), strings.Join(reverseChronologicalLogLines, "\n")), nil
})
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed)
}
})
})