mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #130945 from danwinship/prefersame-e2e-test
e2e testing for PreferSameZone/PreferSameNode
This commit is contained in:
commit
c90a4b16b6
@ -523,14 +523,6 @@ var (
|
||||
// - ci-kubernetes-node-e2e-cri-proxy-serial
|
||||
CriProxy = framework.WithFeature(framework.ValidFeatures.Add("CriProxy"))
|
||||
|
||||
// Owner: sig-network
|
||||
// Marks tests that require a cluster with Topology Hints enabled.
|
||||
TopologyHints = framework.WithFeature(framework.ValidFeatures.Add("Topology Hints"))
|
||||
|
||||
// Owner: sig-network
|
||||
// Marks tests that require a cluster with Traffic Distribution enabled.
|
||||
TrafficDistribution = framework.WithFeature(framework.ValidFeatures.Add("Traffic Distribution"))
|
||||
|
||||
// TODO: document the feature (owning SIG, when to use this feature for a test)
|
||||
TopologyManager = framework.WithFeature(framework.ValidFeatures.Add("TopologyManager"))
|
||||
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/feature"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
@ -41,7 +40,7 @@ import (
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
)
|
||||
|
||||
var _ = common.SIGDescribe(feature.TopologyHints, func() {
|
||||
var _ = common.SIGDescribe("Topology Hints", func() {
|
||||
f := framework.NewDefaultFramework("topology-hints")
|
||||
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
|
||||
|
||||
|
@ -27,9 +27,9 @@ import (
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/e2e/feature"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
@ -43,7 +43,7 @@ import (
|
||||
"github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGate(features.ServiceTrafficDistribution), func() {
|
||||
var _ = common.SIGDescribe("Traffic Distribution", func() {
|
||||
f := framework.NewDefaultFramework("traffic-distribution")
|
||||
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
|
||||
|
||||
@ -79,31 +79,6 @@ var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGat
|
||||
}
|
||||
}
|
||||
|
||||
// endpointSlicesHaveSameZoneHints returns a matcher function to be used with
|
||||
// gomega.Eventually().Should(...). It checks that the passed EndpointSlices
|
||||
// have zone-hints which match the endpoint's zone.
|
||||
endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) {
|
||||
if len(slices) == 0 {
|
||||
return nil, fmt.Errorf("no endpointslices found")
|
||||
}
|
||||
for _, slice := range slices {
|
||||
for _, endpoint := range slice.Endpoints {
|
||||
var ip string
|
||||
if len(endpoint.Addresses) > 0 {
|
||||
ip = endpoint.Addresses[0]
|
||||
}
|
||||
var zone string
|
||||
if endpoint.Zone != nil {
|
||||
zone = *endpoint.Zone
|
||||
}
|
||||
if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
|
||||
return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 /* indent one level */)), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
// requestsFromClient returns a helper function to be used with
|
||||
// gomega.Eventually(...). It fetches the logs from the clientPod and returns
|
||||
// them in reverse-chronological order.
|
||||
@ -119,164 +94,392 @@ var _ = common.SIGDescribe(feature.TrafficDistribution, framework.WithFeatureGat
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Main test specifications.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// getNodesForMultiNode returns a set of nodes for a test case with 3 zones with 2
|
||||
// nodes each. If there are not suitable nodes/zones, the test is skipped.
|
||||
getNodesForMultiNode := func(ctx context.Context) ([]*v1.Node, []*v1.Node, []*v1.Node) {
|
||||
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
|
||||
framework.ExpectNoError(err)
|
||||
nodesForZone := make(map[string][]*v1.Node)
|
||||
for _, node := range nodeList.Items {
|
||||
zone := node.Labels[v1.LabelTopologyZone]
|
||||
nodesForZone[zone] = append(nodesForZone[zone], &node)
|
||||
}
|
||||
if len(nodesForZone) < 3 {
|
||||
e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each")
|
||||
}
|
||||
|
||||
ginkgo.When("Service has trafficDistribution=PreferClose", func() {
|
||||
ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) {
|
||||
|
||||
ginkgo.By("finding 3 zones with schedulable nodes")
|
||||
allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c)
|
||||
framework.ExpectNoError(err)
|
||||
if len(allZonesSet) < 3 {
|
||||
framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet))
|
||||
var multiNodeZones [][]*v1.Node
|
||||
for _, nodes := range nodesForZone {
|
||||
if len(nodes) > 1 {
|
||||
multiNodeZones = append(multiNodeZones, nodes)
|
||||
}
|
||||
zones := allZonesSet.UnsortedList()[:3]
|
||||
|
||||
ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
|
||||
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
|
||||
framework.ExpectNoError(err)
|
||||
nodeForZone := make(map[string]string)
|
||||
for _, zone := range zones {
|
||||
found := false
|
||||
for _, node := range nodeList.Items {
|
||||
if zone == node.Labels[v1.LabelTopologyZone] {
|
||||
found = true
|
||||
nodeForZone[zone] = node.GetName()
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */))
|
||||
}
|
||||
if len(multiNodeZones) == 3 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(multiNodeZones) < 3 {
|
||||
e2eskipper.Skipf("need at least 3 zones, with at least 2 schedulable nodes each")
|
||||
}
|
||||
|
||||
ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
|
||||
zoneForServingPod := make(map[string]string)
|
||||
var servingPods []*v1.Pod
|
||||
servingPodLabels := map[string]string{"app": f.UniqueName}
|
||||
for _, zone := range zones[:2] {
|
||||
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
|
||||
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
|
||||
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||
pod.Labels = servingPodLabels
|
||||
return multiNodeZones[0], multiNodeZones[1], multiNodeZones[2]
|
||||
}
|
||||
|
||||
servingPods = append(servingPods, pod)
|
||||
zoneForServingPod[pod.Name] = zone
|
||||
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
|
||||
// Data structures for tracking server and client pods
|
||||
type serverPod struct {
|
||||
node *v1.Node
|
||||
pod *v1.Pod
|
||||
}
|
||||
|
||||
type clientPod struct {
|
||||
node *v1.Node
|
||||
endpoints []*serverPod
|
||||
pod *v1.Pod
|
||||
}
|
||||
|
||||
// allocateClientsAndServers figures out where to put clients and servers for
|
||||
// a simple "same-zone" traffic distribution test.
|
||||
allocateClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) {
|
||||
ginkgo.By("finding 3 zones with schedulable nodes")
|
||||
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
|
||||
framework.ExpectNoError(err)
|
||||
nodeForZone := make(map[string]*v1.Node)
|
||||
for _, node := range nodeList.Items {
|
||||
zone := node.Labels[v1.LabelTopologyZone]
|
||||
if nodeForZone[zone] != nil {
|
||||
continue
|
||||
}
|
||||
e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
|
||||
nodeForZone[zone] = &node
|
||||
if len(nodeForZone) == 3 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(nodeForZone) < 3 {
|
||||
e2eskipper.Skipf("got %d zones with schedulable nodes, need at least 3", len(nodeForZone))
|
||||
}
|
||||
|
||||
trafficDist := v1.ServiceTrafficDistributionPreferClose
|
||||
svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "traffic-dist-test-service",
|
||||
var clientPods []*clientPod
|
||||
var serverPods []*serverPod
|
||||
|
||||
// We want clients in all three zones
|
||||
for _, node := range nodeForZone {
|
||||
clientPods = append(clientPods, &clientPod{node: node})
|
||||
}
|
||||
|
||||
// and endpoints in the first two zones
|
||||
serverPods = []*serverPod{
|
||||
{node: clientPods[0].node},
|
||||
{node: clientPods[1].node},
|
||||
}
|
||||
|
||||
// The clients with an endpoint in the same zone should only connect to
|
||||
// that endpoint. The client with no endpoint in its zone should connect
|
||||
// to both endpoints.
|
||||
clientPods[0].endpoints = []*serverPod{serverPods[0]}
|
||||
clientPods[1].endpoints = []*serverPod{serverPods[1]}
|
||||
clientPods[2].endpoints = serverPods
|
||||
|
||||
return clientPods, serverPods
|
||||
}
|
||||
|
||||
// allocateMultiNodeClientsAndServers figures out where to put clients and servers
|
||||
// for a "same-zone" traffic distribution test with multiple nodes in each zone.
|
||||
allocateMultiNodeClientsAndServers := func(ctx context.Context) ([]*clientPod, []*serverPod) {
|
||||
ginkgo.By("finding a set of zones and nodes for the test")
|
||||
zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx)
|
||||
|
||||
var clientPods []*clientPod
|
||||
var serverPods []*serverPod
|
||||
|
||||
// First zone: a client and an endpoint on each node, and both clients
|
||||
// should talk to both endpoints.
|
||||
endpointsForZone := []*serverPod{
|
||||
{node: zone1Nodes[0]},
|
||||
{node: zone1Nodes[1]},
|
||||
}
|
||||
|
||||
clientPods = append(clientPods,
|
||||
&clientPod{
|
||||
node: zone1Nodes[0],
|
||||
endpoints: endpointsForZone,
|
||||
},
|
||||
&clientPod{
|
||||
node: zone1Nodes[1],
|
||||
endpoints: endpointsForZone,
|
||||
},
|
||||
)
|
||||
serverPods = append(serverPods, endpointsForZone...)
|
||||
|
||||
// Second zone: a client on one node and a server on the other.
|
||||
endpointsForZone = []*serverPod{
|
||||
{node: zone2Nodes[1]},
|
||||
}
|
||||
|
||||
clientPods = append(clientPods,
|
||||
&clientPod{
|
||||
node: zone2Nodes[0],
|
||||
endpoints: endpointsForZone,
|
||||
},
|
||||
)
|
||||
serverPods = append(serverPods, endpointsForZone...)
|
||||
|
||||
// Third zone: just a client, which should connect to the servers in the
|
||||
// other two zones.
|
||||
clientPods = append(clientPods,
|
||||
&clientPod{
|
||||
node: zone3Nodes[0],
|
||||
endpoints: serverPods,
|
||||
},
|
||||
)
|
||||
|
||||
return clientPods, serverPods
|
||||
}
|
||||
|
||||
// createService creates the service for a traffic distribution test
|
||||
createService := func(ctx context.Context, trafficDist string) *v1.Service {
|
||||
serviceName := "traffic-dist-test-service"
|
||||
ginkgo.By(fmt.Sprintf("creating a service %q with trafficDistribution %q", serviceName, trafficDist))
|
||||
return createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: serviceName,
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{
|
||||
"app": f.UniqueName,
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: servingPodLabels,
|
||||
TrafficDistribution: &trafficDist,
|
||||
Ports: []v1.ServicePort{{
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt32(9376),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}},
|
||||
},
|
||||
})
|
||||
ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
|
||||
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{})
|
||||
TrafficDistribution: &trafficDist,
|
||||
Ports: []v1.ServicePort{{
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt32(9376),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints")
|
||||
gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints)
|
||||
// createPods creates endpoint pods for svc as described by serverPods, waits for
|
||||
// the EndpointSlices to be updated, and creates clientPods as described by
|
||||
// clientPods.
|
||||
createPods := func(ctx context.Context, svc *v1.Service, clientPods []*clientPod, serverPods []*serverPod) {
|
||||
var podsToCreate []*v1.Pod
|
||||
for i, sp := range serverPods {
|
||||
node := sp.node.Name
|
||||
zone := sp.node.Labels[v1.LabelTopologyZone]
|
||||
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("server-%d-%s", i, node), nil, nil, nil, "serve-hostname")
|
||||
ginkgo.By(fmt.Sprintf("creating a server pod %q on node %q in zone %q", pod.Name, node, zone))
|
||||
nodeSelection := e2epod.NodeSelection{Name: node}
|
||||
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||
pod.Labels = svc.Spec.Selector
|
||||
|
||||
ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
|
||||
sp.pod = pod
|
||||
podsToCreate = append(podsToCreate, pod)
|
||||
}
|
||||
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
|
||||
|
||||
createClientPod := func(ctx context.Context, zone string) *v1.Pod {
|
||||
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
|
||||
pod.Spec.NodeName = nodeForZone[zone]
|
||||
nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
|
||||
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
|
||||
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
|
||||
pod.Spec.Containers[0].Name = pod.Name
|
||||
ginkgo.By("waiting for EndpointSlices to be created")
|
||||
err := framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, len(serverPods), 1*time.Second, e2eservice.ServiceEndpointsTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
slices := endpointSlicesForService(svc.Name)
|
||||
framework.Logf("got slices:\n%v", format.Object(slices, 1))
|
||||
|
||||
ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
|
||||
return e2epod.NewPodClient(f).CreateSync(ctx, pod)
|
||||
podsToCreate = nil
|
||||
for i, cp := range clientPods {
|
||||
node := cp.node.Name
|
||||
zone := cp.node.Labels[v1.LabelTopologyZone]
|
||||
pod := e2epod.NewAgnhostPod(f.Namespace.Name, fmt.Sprintf("client-%d-%s", i, node), nil, nil, nil)
|
||||
ginkgo.By(fmt.Sprintf("creating a client pod %q on node %q in zone %q", pod.Name, node, zone))
|
||||
nodeSelection := e2epod.NodeSelection{Name: node}
|
||||
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
|
||||
pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
|
||||
pod.Spec.Containers[0].Name = pod.Name
|
||||
|
||||
cp.pod = pod
|
||||
podsToCreate = append(podsToCreate, pod)
|
||||
}
|
||||
e2epod.NewPodClient(f).CreateBatch(ctx, podsToCreate)
|
||||
}
|
||||
|
||||
// checkTrafficDistribution checks that traffic from clientPods is distributed in
|
||||
// the expected way.
|
||||
checkTrafficDistribution := func(ctx context.Context, clientPods []*clientPod) {
|
||||
for _, cp := range clientPods {
|
||||
wantedEndpoints := sets.New[string]()
|
||||
for _, sp := range cp.endpoints {
|
||||
wantedEndpoints.Insert(sp.pod.Name)
|
||||
}
|
||||
unreachedEndpoints := wantedEndpoints.Clone()
|
||||
|
||||
for _, clientZone := range zones[:2] {
|
||||
framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
|
||||
clientPod := createClientPod(ctx, clientZone)
|
||||
ginkgo.By(fmt.Sprintf("ensuring that requests from %s on %s go to the endpoint(s) %v", cp.pod.Name, cp.node.Name, wantedEndpoints.UnsortedList()))
|
||||
|
||||
framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
|
||||
|
||||
requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
|
||||
logLines := reverseChronologicalLogLines
|
||||
if len(logLines) < 20 {
|
||||
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
consecutiveSameZone := 0
|
||||
|
||||
for _, logLine := range logLines {
|
||||
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
|
||||
continue
|
||||
}
|
||||
destZone, ok := zoneForServingPod[logLine]
|
||||
if !ok {
|
||||
return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
if clientZone != destZone {
|
||||
return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
consecutiveSameZone++
|
||||
if consecutiveSameZone >= 10 {
|
||||
return nil, nil // Pass condition.
|
||||
}
|
||||
}
|
||||
// Ideally, the matcher would never reach this condition
|
||||
return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
})
|
||||
|
||||
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
|
||||
}
|
||||
|
||||
ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
|
||||
|
||||
clientZone := zones[2]
|
||||
framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
|
||||
clientPod := createClientPod(ctx, clientZone)
|
||||
|
||||
framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
|
||||
|
||||
requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
|
||||
requestsSucceed := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
|
||||
logLines := reverseChronologicalLogLines
|
||||
if len(logLines) < 20 {
|
||||
return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
|
||||
// Requests are counted as successful when the response read from the log
|
||||
// lines is the name of a recognizable serving pod.
|
||||
consecutiveSuccessfulRequests := 0
|
||||
|
||||
for _, logLine := range logLines {
|
||||
if logLine == "" || strings.HasPrefix(logLine, "Date:") {
|
||||
continue
|
||||
}
|
||||
_, servingPodExists := zoneForServingPod[logLine]
|
||||
if !servingPodExists {
|
||||
return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
destEndpoint := logLine
|
||||
if !wantedEndpoints.Has(destEndpoint) {
|
||||
return gomegaCustomError("request from %s should not have reached %s\nreverseChronologicalLogLines=\n%v", cp.pod.Name, destEndpoint, strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
}
|
||||
consecutiveSuccessfulRequests++
|
||||
if consecutiveSuccessfulRequests >= 10 {
|
||||
return nil, nil // Pass condition
|
||||
unreachedEndpoints.Delete(destEndpoint)
|
||||
if consecutiveSuccessfulRequests >= 10 && len(unreachedEndpoints) == 0 {
|
||||
return nil, nil // Pass condition.
|
||||
}
|
||||
}
|
||||
// Ideally, the matcher would never reach this condition
|
||||
return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
return gomegaCustomError("requests didn't meet the required criteria to reach all endpoints %v\nreverseChronologicalLogLines=\n%v", wantedEndpoints.UnsortedList(), strings.Join(reverseChronologicalLogLines, "\n")), nil
|
||||
})
|
||||
|
||||
gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
|
||||
gomega.Eventually(ctx, requestsFromClient(cp.pod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceed)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// Main test specifications.
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
framework.It("should route traffic to an endpoint in the same zone when using PreferClose", func(ctx context.Context) {
|
||||
clientPods, serverPods := allocateClientsAndServers(ctx)
|
||||
svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose)
|
||||
createPods(ctx, svc, clientPods, serverPods)
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
})
|
||||
|
||||
framework.It("should route traffic correctly between pods on multiple nodes when using PreferClose", func(ctx context.Context) {
|
||||
clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx)
|
||||
svc := createService(ctx, v1.ServiceTrafficDistributionPreferClose)
|
||||
createPods(ctx, svc, clientPods, serverPods)
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
})
|
||||
|
||||
framework.It("should route traffic to an endpoint in the same zone when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
|
||||
clientPods, serverPods := allocateClientsAndServers(ctx)
|
||||
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone)
|
||||
createPods(ctx, svc, clientPods, serverPods)
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
})
|
||||
|
||||
framework.It("should route traffic correctly between pods on multiple nodes when using PreferSameZone", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
|
||||
clientPods, serverPods := allocateMultiNodeClientsAndServers(ctx)
|
||||
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameZone)
|
||||
createPods(ctx, svc, clientPods, serverPods)
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
})
|
||||
|
||||
framework.It("should route traffic to an endpoint on the same node or fall back to same zone when using PreferSameNode", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
|
||||
ginkgo.By("finding a set of nodes for the test")
|
||||
zone1Nodes, zone2Nodes, zone3Nodes := getNodesForMultiNode(ctx)
|
||||
|
||||
var clientPods []*clientPod
|
||||
var serverPods []*serverPod
|
||||
|
||||
// The first zone: a client and a server on each node. Each client only
|
||||
// talks to the server on the same node.
|
||||
endpointsForZone := []*serverPod{
|
||||
{node: zone1Nodes[0]},
|
||||
{node: zone1Nodes[1]},
|
||||
}
|
||||
clientPods = append(clientPods,
|
||||
&clientPod{
|
||||
node: zone1Nodes[0],
|
||||
endpoints: []*serverPod{endpointsForZone[0]},
|
||||
},
|
||||
&clientPod{
|
||||
node: zone1Nodes[1],
|
||||
endpoints: []*serverPod{endpointsForZone[1]},
|
||||
},
|
||||
)
|
||||
serverPods = append(serverPods, endpointsForZone...)
|
||||
|
||||
// The second zone: a client on one node and a server on the other. The
|
||||
// client should fall back to connecting (only) to its same-zone endpoint.
|
||||
endpointsForZone = []*serverPod{
|
||||
{node: zone2Nodes[1]},
|
||||
}
|
||||
clientPods = append(clientPods,
|
||||
&clientPod{
|
||||
node: zone2Nodes[0],
|
||||
endpoints: endpointsForZone,
|
||||
},
|
||||
)
|
||||
serverPods = append(serverPods, endpointsForZone...)
|
||||
|
||||
// The third zone: just a client. Since it has neither a same-node nor a
|
||||
// same-zone endpoint, it should connect to all endpoints.
|
||||
clientPods = append(clientPods,
|
||||
&clientPod{
|
||||
node: zone3Nodes[0],
|
||||
endpoints: serverPods,
|
||||
},
|
||||
)
|
||||
|
||||
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode)
|
||||
createPods(ctx, svc, clientPods, serverPods)
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
})
|
||||
|
||||
framework.It("should route traffic to an endpoint on the same node when using PreferSameNode and fall back when the endpoint becomes unavailable", framework.WithFeatureGate(features.PreferSameTrafficDistribution), func(ctx context.Context) {
|
||||
ginkgo.By("finding a set of nodes for the test")
|
||||
nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
|
||||
framework.ExpectNoError(err)
|
||||
if len(nodeList.Items) < 2 {
|
||||
e2eskipper.Skipf("have %d schedulable nodes, need at least 2", len(nodeList.Items))
|
||||
}
|
||||
nodes := nodeList.Items[:2]
|
||||
|
||||
// One client and one server on each node
|
||||
serverPods := []*serverPod{
|
||||
{node: &nodes[0]},
|
||||
{node: &nodes[1]},
|
||||
}
|
||||
clientPods := []*clientPod{
|
||||
{
|
||||
node: &nodes[0],
|
||||
endpoints: []*serverPod{serverPods[0]},
|
||||
},
|
||||
{
|
||||
node: &nodes[1],
|
||||
endpoints: []*serverPod{serverPods[1]},
|
||||
},
|
||||
}
|
||||
|
||||
svc := createService(ctx, v1.ServiceTrafficDistributionPreferSameNode)
|
||||
createPods(ctx, svc, clientPods, serverPods)
|
||||
|
||||
ginkgo.By("ensuring that each client talks to its same-node endpoint when both endpoints exist")
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
|
||||
ginkgo.By("killing the server pod on the first node and waiting for the EndpointSlices to be updated")
|
||||
err = c.CoreV1().Pods(f.Namespace.Name).Delete(ctx, serverPods[0].pod.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 1, 1*time.Second, e2eservice.ServiceEndpointsTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("ensuring that both clients talk to the remaining endpoint when only one endpoint exists")
|
||||
serverPods[0].pod = nil
|
||||
clientPods[0].endpoints = []*serverPod{serverPods[1]}
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
|
||||
ginkgo.By("recreating the missing server pod and waiting for the EndpointSlices to be updated")
|
||||
// We can't use createPods() here because if we only tell it about
|
||||
// serverPods[0] and not serverPods[1] it will expect there to be only one
|
||||
// endpoint.
|
||||
pod := e2epod.NewAgnhostPod(f.Namespace.Name, "server-0-new", nil, nil, nil, "serve-hostname")
|
||||
nodeSelection := e2epod.NodeSelection{Name: serverPods[0].node.Name}
|
||||
e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
|
||||
pod.Labels = svc.Spec.Selector
|
||||
serverPods[0].pod = e2epod.NewPodClient(f).CreateSync(ctx, pod)
|
||||
err = framework.WaitForServiceEndpointsNum(ctx, c, svc.Namespace, svc.Name, 2, 1*time.Second, e2eservice.ServiceEndpointsTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("ensuring that each client talks only to its same-node endpoint again")
|
||||
clientPods[0].endpoints = []*serverPod{serverPods[0]}
|
||||
checkTrafficDistribution(ctx, clientPods)
|
||||
})
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user