Remove a level of indentation in the TrafficDistribution e2e

(No other changes to the code)
This commit is contained in:
Dan Winship 2025-03-19 13:39:36 -04:00
parent 8f7bb964de
commit b1a0fea4c6

View File

@ -96,159 +96,155 @@ var _ = common.SIGDescribe("Traffic Distribution", func() {
// Main test specifications. // Main test specifications.
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
ginkgo.When("Service has trafficDistribution=PreferClose", func() { ginkgo.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) {
ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) {
ginkgo.By("finding 3 zones with schedulable nodes") ginkgo.By("finding 3 zones with schedulable nodes")
allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c) allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c)
framework.ExpectNoError(err) framework.ExpectNoError(err)
if len(allZonesSet) < 3 { if len(allZonesSet) < 3 {
framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet)) framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet))
} }
zones := allZonesSet.UnsortedList()[:3] zones := allZonesSet.UnsortedList()[:3]
ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones)) ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
framework.ExpectNoError(err) framework.ExpectNoError(err)
nodeForZone := make(map[string]string) nodeForZone := make(map[string]string)
for _, zone := range zones { for _, zone := range zones {
found := false found := false
for _, node := range nodeList.Items { for _, node := range nodeList.Items {
if zone == node.Labels[v1.LabelTopologyZone] { if zone == node.Labels[v1.LabelTopologyZone] {
found = true found = true
nodeForZone[zone] = node.GetName() nodeForZone[zone] = node.GetName()
}
}
if !found {
framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */))
} }
} }
if !found {
ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2])) framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */))
zoneForServingPod := make(map[string]string)
var servingPods []*v1.Pod
servingPodLabels := map[string]string{"app": f.UniqueName}
for _, zone := range zones[:2] {
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
pod.Labels = servingPodLabels
servingPods = append(servingPods, pod)
zoneForServingPod[pod.Name] = zone
} }
e2epod.NewPodClient(f).CreateBatch(ctx, servingPods) }
trafficDist := v1.ServiceTrafficDistributionPreferClose ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{ zoneForServingPod := make(map[string]string)
ObjectMeta: metav1.ObjectMeta{ var servingPods []*v1.Pod
Name: "traffic-dist-test-service", servingPodLabels := map[string]string{"app": f.UniqueName}
}, for _, zone := range zones[:2] {
Spec: v1.ServiceSpec{ pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
Selector: servingPodLabels, nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
TrafficDistribution: &trafficDist, e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
Ports: []v1.ServicePort{{ pod.Labels = servingPodLabels
Port: 80,
TargetPort: intstr.FromInt32(9376),
Protocol: v1.ProtocolTCP,
}},
},
})
ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
ginkgo.By("waiting for EndpointSlices to be created") servingPods = append(servingPods, pod)
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout) zoneForServingPod[pod.Name] = zone
framework.ExpectNoError(err) }
slices := endpointSlicesForService(svc.Name) e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
framework.Logf("got slices:\n%v", format.Object(slices, 1))
ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone") trafficDist := v1.ServiceTrafficDistributionPreferClose
svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "traffic-dist-test-service",
},
Spec: v1.ServiceSpec{
Selector: servingPodLabels,
TrafficDistribution: &trafficDist,
Ports: []v1.ServicePort{{
Port: 80,
TargetPort: intstr.FromInt32(9376),
Protocol: v1.ProtocolTCP,
}},
},
})
ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
createClientPod := func(ctx context.Context, zone string) *v1.Pod { ginkgo.By("waiting for EndpointSlices to be created")
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil) err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(servingPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout)
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]} framework.ExpectNoError(err)
e2epod.SetNodeSelection(&pod.Spec, nodeSelection) slices := endpointSlicesForService(svc.Name)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name) framework.Logf("got slices:\n%v", format.Object(slices, 1))
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
pod.Spec.Containers[0].Name = pod.Name
return e2epod.NewPodClient(f).CreateSync(ctx, pod) ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
}
for _, clientZone := range zones[:2] { createClientPod := func(ctx context.Context, zone string) *v1.Pod {
framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone) pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
clientPod := createClientPod(ctx, clientZone) nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
pod.Spec.Containers[0].Name = pod.Name
framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone) return e2epod.NewPodClient(f).CreateSync(ctx, pod)
}
requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { for _, clientZone := range zones[:2] {
logLines := reverseChronologicalLogLines framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
if len(logLines) < 20 {
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
}
consecutiveSameZone := 0
for _, logLine := range logLines {
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
continue
}
destZone, ok := zoneForServingPod[logLine]
if !ok {
return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
}
if clientZone != destZone {
return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
}
consecutiveSameZone++
if consecutiveSameZone >= 10 {
return nil, nil // Pass condition.
}
}
// Ideally, the matcher would never reach this condition
return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
})
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
}
ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
clientZone := zones[2]
framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
clientPod := createClientPod(ctx, clientZone) clientPod := createClientPod(ctx, clientZone)
framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone) framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) { requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
logLines := reverseChronologicalLogLines logLines := reverseChronologicalLogLines
if len(logLines) < 20 { if len(logLines) < 20 {
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
} }
consecutiveSameZone := 0
// Requests are counted as successful when the response read from the log
// lines is the name of a recognizable serving pod.
consecutiveSuccessfulRequests := 0
for _, logLine := range logLines { for _, logLine := range logLines {
if logLine == "" || strings.HasPrefix(logLine, "Date:") { if logLine == "" || strings.HasPrefix(logLine, "Date:") {
continue continue
} }
_, servingPodExists := zoneForServingPod[logLine] destZone, ok := zoneForServingPod[logLine]
if !servingPodExists { if !ok {
return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
} }
consecutiveSuccessfulRequests++ if clientZone != destZone {
if consecutiveSuccessfulRequests >= 10 { return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
return nil, nil // Pass condition }
consecutiveSameZone++
if consecutiveSameZone >= 10 {
return nil, nil // Pass condition.
} }
} }
// Ideally, the matcher would never reach this condition // Ideally, the matcher would never reach this condition
return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
}) })
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod) gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
}
ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
clientZone := zones[2]
framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
clientPod := createClientPod(ctx, clientZone)
framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
logLines := reverseChronologicalLogLines
if len(logLines) < 20 {
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
}
// Requests are counted as successful when the response read from the log
// lines is the name of a recognizable serving pod.
consecutiveSuccessfulRequests := 0
for _, logLine := range logLines {
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
continue
}
_, servingPodExists := zoneForServingPod[logLine]
if !servingPodExists {
return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
}
consecutiveSuccessfulRequests++
if consecutiveSuccessfulRequests >= 10 {
return nil, nil // Pass condition
}
}
// Ideally, the matcher would never reach this condition
return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
}) })
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
}) })
}) })