From 3feea9db7ca2766c7992db7097e2a450d53ec366 Mon Sep 17 00:00:00 2001 From: Ionut Balutoiu Date: Mon, 21 Nov 2022 21:44:14 +0200 Subject: [PATCH] tests: Add DaemonSet with LB rolling update test Add a test case with a DaemonSet behind a simple load balancer whose address is being constantly hit via HTTP requests. The test passes if there are no errors when doing HTTP requests to the load balancer address, during DaemonSet `RollingUpdate` operations. Signed-off-by: Ionut Balutoiu --- test/e2e/network/loadbalancer.go | 201 +++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) diff --git a/test/e2e/network/loadbalancer.go b/test/e2e/network/loadbalancer.go index 869f9cdd4e7..c7fd45baba1 100644 --- a/test/e2e/network/loadbalancer.go +++ b/test/e2e/network/loadbalancer.go @@ -19,22 +19,31 @@ package network import ( "context" "fmt" + "io" "math/big" "net" + "net/http" "strconv" "strings" "sync" + "sync/atomic" "time" compute "google.golang.org/api/compute/v1" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + e2eapps "k8s.io/kubernetes/test/e2e/apps" "k8s.io/kubernetes/test/e2e/framework" + e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2ekubesystem "k8s.io/kubernetes/test/e2e/framework/kubesystem" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" @@ -1299,6 +1308,24 @@ var _ = common.SIGDescribe("LoadBalancers", func() { framework.Failf("Failed to connect to backend 2") } }) + + ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Cluster [Slow]", func() { + // We start with a low but reasonable threshold to analyze the results. + // The goal is to achieve 99% minimum success rate. + // TODO: We should do incremental steps toward the goal. + minSuccessRate := 0.95 + + testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeCluster, minSuccessRate) + }) + + ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Local [Slow]", func() { + // We start with a low but reasonable threshold to analyze the results. + // The goal is to achieve 99% minimum success rate. + // TODO: We should do incremental steps toward the goal. + minSuccessRate := 0.95 + + testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeLocal, minSuccessRate) + }) }) var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() { @@ -1718,3 +1745,177 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() { } }) }) + +func testRollingUpdateLBConnectivityDisruption(f *framework.Framework, externalTrafficPolicy v1.ServiceExternalTrafficPolicyType, minSuccessRate float64) { + cs := f.ClientSet + ns := f.Namespace.Name + name := "test-lb-rolling-update" + labels := map[string]string{"name": name} + gracePeriod := int64(60) + maxUnavailable := intstr.FromString("10%") + ds := e2edaemonset.NewDaemonSet(name, e2eapps.AgnhostImage, labels, nil, nil, + []v1.ContainerPort{ + {ContainerPort: 80}, + }, + "netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod), + ) + ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &maxUnavailable, + }, + } + ds.Spec.Template.Labels = labels + ds.Spec.Template.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + + nodeNames := e2edaemonset.SchedulableNodes(cs, ds) + e2eskipper.SkipUnlessAtLeast(len(nodeNames), 2, "load-balancer rolling update test requires at least 2 schedulable nodes for the DaemonSet") + + ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", name)) + ds, err := cs.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Checking that daemon pods launch on every schedulable node of the cluster") + creationTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs) + err = wait.PollImmediate(framework.Poll, creationTimeout, e2edaemonset.CheckDaemonPodOnNodes(f, ds, nodeNames)) + framework.ExpectNoError(err, "error waiting for daemon pods to start") + err = e2edaemonset.CheckDaemonStatus(f, name) + framework.ExpectNoError(err) + + ginkgo.By(fmt.Sprintf("Creating a service %s with type=LoadBalancer externalTrafficPolicy=%s in namespace %s", name, externalTrafficPolicy, ns)) + jig := e2eservice.NewTestJig(cs, ns, name) + jig.Labels = labels + service, err := jig.CreateLoadBalancerService(creationTimeout, func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = externalTrafficPolicy + }) + framework.ExpectNoError(err) + + lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0]) + svcPort := int(service.Spec.Ports[0].Port) + + ginkgo.By("Hitting the DaemonSet's pods through the service's load balancer") + timeout := e2eservice.LoadBalancerLagTimeoutDefault + if framework.ProviderIs("aws") { + timeout = e2eservice.LoadBalancerLagTimeoutAWS + } + e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout) + + ginkgo.By("Starting a goroutine to continuously hit the DaemonSet's pods through the service's load balancer") + var totalRequests uint64 = 0 + var networkErrors uint64 = 0 + var httpErrors uint64 = 0 + done := make(chan struct{}) + defer close(done) + go func() { + defer ginkgo.GinkgoRecover() + + wait.Until(func() { + atomic.AddUint64(&totalRequests, 1) + client := &http.Client{ + Transport: utilnet.SetTransportDefaults(&http.Transport{ + DisableKeepAlives: true, + }), + Timeout: 5 * time.Second, + } + ipPort := net.JoinHostPort(lbNameOrAddress, strconv.Itoa(svcPort)) + msg := "hello" + url := fmt.Sprintf("http://%s/echo?msg=%s", ipPort, msg) + resp, err := client.Get(url) + if err != nil { + framework.Logf("Got error testing for reachability of %s: %v", url, err) + atomic.AddUint64(&networkErrors, 1) + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + framework.Logf("Got bad status code: %d", resp.StatusCode) + atomic.AddUint64(&httpErrors, 1) + return + } + body, err := io.ReadAll(resp.Body) + if err != nil { + framework.Logf("Got error reading HTTP body: %v", err) + atomic.AddUint64(&httpErrors, 1) + return + } + if string(body) != msg { + framework.Logf("The response body does not contain expected string %s", string(body)) + atomic.AddUint64(&httpErrors, 1) + return + } + }, time.Duration(0), done) + }() + + ginkgo.By("Triggering DaemonSet rolling update several times") + var previousTotalRequests uint64 = 0 + var previousNetworkErrors uint64 = 0 + var previousHttpErrors uint64 = 0 + for i := 1; i <= 5; i++ { + framework.Logf("Update daemon pods environment: [{\"name\":\"VERSION\",\"value\":\"%d\"}]", i) + patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%d"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, i) + ds, err = cs.AppsV1().DaemonSets(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + framework.ExpectNoError(err) + + framework.Logf("Check that daemon pods are available on every node of the cluster with the updated environment.") + err = wait.PollImmediate(framework.Poll, creationTimeout, func() (bool, error) { + podList, err := cs.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + pods := podList.Items + + readyPods := 0 + for _, pod := range pods { + if !metav1.IsControlledBy(&pod, ds) { + continue + } + if pod.DeletionTimestamp != nil { + continue + } + podVersion := "" + for _, env := range pod.Spec.Containers[0].Env { + if env.Name == "VERSION" { + podVersion = env.Value + break + } + } + if podVersion != fmt.Sprintf("%d", i) { + continue + } + podReady := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) + if !podReady { + continue + } + readyPods += 1 + } + framework.Logf("Number of running nodes: %d, number of updated ready pods: %d in daemonset %s", len(nodeNames), readyPods, ds.Name) + return readyPods == len(nodeNames), nil + }) + framework.ExpectNoError(err, "error waiting for daemon pods to be ready") + + // assert that the HTTP requests success rate is above the acceptable threshold after this rolling update + currentTotalRequests := atomic.LoadUint64(&totalRequests) + currentNetworkErrors := atomic.LoadUint64(&networkErrors) + currentHttpErrors := atomic.LoadUint64(&httpErrors) + + partialTotalRequests := currentTotalRequests - previousTotalRequests + partialNetworkErrors := currentNetworkErrors - previousNetworkErrors + partialHttpErrors := currentHttpErrors - previousHttpErrors + partialSuccessRate := (float64(partialTotalRequests) - float64(partialNetworkErrors+partialHttpErrors)) / float64(partialTotalRequests) + + framework.Logf("Load Balancer total HTTP requests: %d", partialTotalRequests) + framework.Logf("Network errors: %d", partialNetworkErrors) + framework.Logf("HTTP errors: %d", partialHttpErrors) + framework.Logf("Success rate: %.2f%%", partialSuccessRate*100) + if partialSuccessRate < minSuccessRate { + framework.Failf("Encountered too many errors when doing HTTP requests to the load balancer address. Success rate is %.2f%%, and the minimum allowed threshold is %.2f%%.", partialSuccessRate*100, minSuccessRate*100) + } + + previousTotalRequests = currentTotalRequests + previousNetworkErrors = currentNetworkErrors + previousHttpErrors = currentHttpErrors + } + + // assert that the load balancer address is still reachable after the rolling updates are finished + e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout) +}