mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #105949 from robscott/topology-e2e
Initial Topology Hints e2e Tests
This commit is contained in:
commit
8ce440c45c
207
test/e2e/network/topology_hints.go
Normal file
207
test/e2e/network/topology_hints.go
Normal file
@ -0,0 +1,207 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package network
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
|
||||
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
|
||||
"k8s.io/kubernetes/test/e2e/network/common"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
)
|
||||
|
||||
var _ = common.SIGDescribe("Feature:Topology Hints", func() {
|
||||
f := framework.NewDefaultFramework("topology-hints")
|
||||
|
||||
// filled in BeforeEach
|
||||
var c clientset.Interface
|
||||
|
||||
ginkgo.BeforeEach(func() {
|
||||
c = f.ClientSet
|
||||
e2eskipper.SkipUnlessMultizone(c)
|
||||
})
|
||||
|
||||
ginkgo.It("should distribute endpoints evenly", func() {
|
||||
portNum := 9376
|
||||
thLabels := map[string]string{labelKey: clientLabelValue}
|
||||
img := imageutils.GetE2EImage(imageutils.Agnhost)
|
||||
ports := []v1.ContainerPort{{ContainerPort: int32(portNum)}}
|
||||
dsConf := e2edaemonset.NewDaemonSet("topology-serve-hostname", img, thLabels, nil, nil, ports, "serve-hostname")
|
||||
ds, err := c.AppsV1().DaemonSets(f.Namespace.Name).Create(context.TODO(), dsConf, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err, "error creating DaemonSet")
|
||||
|
||||
svc := createServiceReportErr(c, f.Namespace.Name, &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "topology-hints",
|
||||
Annotations: map[string]string{
|
||||
v1.AnnotationTopologyAwareHints: "Auto",
|
||||
},
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: thLabels,
|
||||
PublishNotReadyAddresses: true,
|
||||
Ports: []v1.ServicePort{{
|
||||
Name: "example",
|
||||
Port: 80,
|
||||
TargetPort: intstr.FromInt(portNum),
|
||||
Protocol: v1.ProtocolTCP,
|
||||
}},
|
||||
},
|
||||
})
|
||||
|
||||
err = wait.Poll(5*time.Second, framework.PodStartTimeout, func() (bool, error) {
|
||||
return e2edaemonset.CheckRunningOnAllNodes(f, ds)
|
||||
})
|
||||
framework.ExpectNoError(err, "timed out waiting for DaemonSets to be ready")
|
||||
|
||||
nodeNames := e2edaemonset.SchedulableNodes(c, ds)
|
||||
framework.Logf("Waiting for %d endpoints to be tracked in EndpointSlices", len(nodeNames))
|
||||
|
||||
var finalSlices []discoveryv1.EndpointSlice
|
||||
err = wait.Poll(5*time.Second, 3*time.Minute, func() (bool, error) {
|
||||
slices, listErr := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.Name)})
|
||||
if listErr != nil {
|
||||
return false, listErr
|
||||
}
|
||||
|
||||
numEndpoints := 0
|
||||
for _, slice := range slices.Items {
|
||||
numEndpoints += len(slice.Endpoints)
|
||||
}
|
||||
if len(nodeNames) > numEndpoints {
|
||||
framework.Logf("Expected %d endpoints, got %d", len(nodeNames), numEndpoints)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
finalSlices = slices.Items
|
||||
return true, nil
|
||||
})
|
||||
framework.ExpectNoError(err, "timed out waiting for EndpointSlices to be ready")
|
||||
|
||||
ginkgo.By("having hints set for each endpoint")
|
||||
for _, slice := range finalSlices {
|
||||
for _, ep := range slice.Endpoints {
|
||||
if ep.Zone == nil {
|
||||
framework.Failf("Expected endpoint in %s to have zone: %v", slice.Name, ep)
|
||||
}
|
||||
if ep.Hints == nil || len(ep.Hints.ForZones) == 0 {
|
||||
framework.Failf("Expected endpoint in %s to have hints: %v", slice.Name, ep)
|
||||
}
|
||||
if len(ep.Hints.ForZones) > 1 {
|
||||
framework.Failf("Expected endpoint in %s to have exactly 1 zone hint, got %d: %v", slice.Name, len(ep.Hints.ForZones), ep)
|
||||
}
|
||||
if *ep.Zone != ep.Hints.ForZones[0].Name {
|
||||
framework.Failf("Expected endpoint in %s to have same zone hint, got %s: %v", slice.Name, *ep.Zone, ep)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
nodesByZone := map[string]string{}
|
||||
for _, node := range nodeList.Items {
|
||||
if zone, ok := node.Labels[v1.LabelTopologyZone]; ok {
|
||||
nodesByZone[node.Name] = zone
|
||||
}
|
||||
}
|
||||
|
||||
podList, err := c.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
podsByZone := map[string]string{}
|
||||
for _, pod := range podList.Items {
|
||||
if zone, ok := nodesByZone[pod.Spec.NodeName]; ok {
|
||||
podsByZone[pod.Name] = zone
|
||||
}
|
||||
}
|
||||
|
||||
ginkgo.By("keeping requests in the same zone")
|
||||
for i, nodeName := range nodeNames {
|
||||
// Iterate through max of 3 nodes
|
||||
if i > 2 {
|
||||
break
|
||||
}
|
||||
fromZone, ok := nodesByZone[nodeName]
|
||||
if !ok {
|
||||
framework.Failf("Expected zone to be specified for %s node", nodeName)
|
||||
}
|
||||
ginkgo.By("creating a client pod for probing the service from " + fromZone)
|
||||
podName := "curl-from-" + fromZone
|
||||
clientPod := e2epod.NewAgnhostPod(f.Namespace.Name, podName, nil, nil, nil, "serve-hostname")
|
||||
nodeSelection := e2epod.NodeSelection{Name: nodeName}
|
||||
e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
|
||||
cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
|
||||
clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
|
||||
clientPod.Spec.Containers[0].Name = clientPod.Name
|
||||
f.PodClient().CreateSync(clientPod)
|
||||
|
||||
framework.Logf("Ensuring that requests from %s pod on %s node stay in %s zone", clientPod.Name, nodeName, fromZone)
|
||||
|
||||
var logs string
|
||||
if pollErr := wait.Poll(5*time.Second, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
|
||||
var err error
|
||||
logs, err = e2epod.GetPodLogs(c, f.Namespace.Name, clientPod.Name, clientPod.Name)
|
||||
framework.ExpectNoError(err)
|
||||
framework.Logf("Pod client logs: %s", logs)
|
||||
|
||||
logLines := strings.Split(logs, "\n")
|
||||
if len(logLines) < 6 {
|
||||
framework.Logf("only %d log lines, waiting for at least 6", len(logLines))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
consecutiveSameZone := 0
|
||||
|
||||
for i := len(logLines) - 1; i > 0; i-- {
|
||||
if logLines[i] == "" || strings.HasPrefix(logLines[i], "Date:") {
|
||||
continue
|
||||
}
|
||||
destZone, ok := podsByZone[logLines[i]]
|
||||
if !ok {
|
||||
framework.Logf("could not determine dest zone from log line: %s", logLines[i])
|
||||
return false, nil
|
||||
}
|
||||
if fromZone != destZone {
|
||||
framework.Logf("expected request from %s to stay in %s zone, delivered to %s zone", clientPod.Name, fromZone, destZone)
|
||||
return false, nil
|
||||
}
|
||||
consecutiveSameZone++
|
||||
if consecutiveSameZone >= 5 {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}); pollErr != nil {
|
||||
framework.Failf("expected 5 consecutive requests from %s to stay in zone %s within %v, stdout: %v", clientPod.Name, fromZone, e2eservice.KubeProxyLagTimeout, logs)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
Loading…
Reference in New Issue
Block a user