diff --git a/test/e2e/network/loadbalancer.go b/test/e2e/network/loadbalancer.go index a1041ac8ad5..52984109eb1 100644 --- a/test/e2e/network/loadbalancer.go +++ b/test/e2e/network/loadbalancer.go @@ -19,22 +19,31 @@ package network import ( "context" "fmt" + "io" "math/big" "net" + "net/http" "strconv" "strings" "sync" + "sync/atomic" "time" compute "google.golang.org/api/compute/v1" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + e2eapps "k8s.io/kubernetes/test/e2e/apps" "k8s.io/kubernetes/test/e2e/framework" + e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2ekubesystem "k8s.io/kubernetes/test/e2e/framework/kubesystem" e2enetwork "k8s.io/kubernetes/test/e2e/framework/network" @@ -1299,6 +1308,24 @@ var _ = common.SIGDescribe("LoadBalancers", func() { framework.Failf("Failed to connect to backend 2") } }) + + ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Cluster [Slow]", func() { + // We start with a low but reasonable threshold to analyze the results. + // The goal is to achieve 99% minimum success rate. + // TODO: We should do incremental steps toward the goal. + minSuccessRate := 0.95 + + testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeCluster, minSuccessRate) + }) + + ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Local [Slow]", func() { + // We start with a low but reasonable threshold to analyze the results. + // The goal is to achieve 99% minimum success rate. + // TODO: We should do incremental steps toward the goal. + minSuccessRate := 0.95 + + testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeLocal, minSuccessRate) + }) }) var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() { @@ -1718,3 +1745,177 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() { } }) }) + +func testRollingUpdateLBConnectivityDisruption(f *framework.Framework, externalTrafficPolicy v1.ServiceExternalTrafficPolicyType, minSuccessRate float64) { + cs := f.ClientSet + ns := f.Namespace.Name + name := "test-lb-rolling-update" + labels := map[string]string{"name": name} + gracePeriod := int64(60) + maxUnavailable := intstr.FromString("10%") + ds := e2edaemonset.NewDaemonSet(name, e2eapps.AgnhostImage, labels, nil, nil, + []v1.ContainerPort{ + {ContainerPort: 80}, + }, + "netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod), + ) + ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &maxUnavailable, + }, + } + ds.Spec.Template.Labels = labels + ds.Spec.Template.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + + nodeNames := e2edaemonset.SchedulableNodes(cs, ds) + e2eskipper.SkipUnlessAtLeast(len(nodeNames), 2, "load-balancer rolling update test requires at least 2 schedulable nodes for the DaemonSet") + + ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", name)) + ds, err := cs.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Checking that daemon pods launch on every schedulable node of the cluster") + creationTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs) + err = wait.PollImmediate(framework.Poll, creationTimeout, e2edaemonset.CheckDaemonPodOnNodes(f, ds, nodeNames)) + framework.ExpectNoError(err, "error waiting for daemon pods to start") + err = e2edaemonset.CheckDaemonStatus(f, name) + framework.ExpectNoError(err) + + ginkgo.By(fmt.Sprintf("Creating a service %s with type=LoadBalancer externalTrafficPolicy=%s in namespace %s", name, externalTrafficPolicy, ns)) + jig := e2eservice.NewTestJig(cs, ns, name) + jig.Labels = labels + service, err := jig.CreateLoadBalancerService(creationTimeout, func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = externalTrafficPolicy + }) + framework.ExpectNoError(err) + + lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0]) + svcPort := int(service.Spec.Ports[0].Port) + + ginkgo.By("Hitting the DaemonSet's pods through the service's load balancer") + timeout := e2eservice.LoadBalancerLagTimeoutDefault + if framework.ProviderIs("aws") { + timeout = e2eservice.LoadBalancerLagTimeoutAWS + } + e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout) + + ginkgo.By("Starting a goroutine to continuously hit the DaemonSet's pods through the service's load balancer") + var totalRequests uint64 = 0 + var networkErrors uint64 = 0 + var httpErrors uint64 = 0 + done := make(chan struct{}) + defer close(done) + go func() { + defer ginkgo.GinkgoRecover() + + wait.Until(func() { + atomic.AddUint64(&totalRequests, 1) + client := &http.Client{ + Transport: utilnet.SetTransportDefaults(&http.Transport{ + DisableKeepAlives: true, + }), + Timeout: 5 * time.Second, + } + ipPort := net.JoinHostPort(lbNameOrAddress, strconv.Itoa(svcPort)) + msg := "hello" + url := fmt.Sprintf("http://%s/echo?msg=%s", ipPort, msg) + resp, err := client.Get(url) + if err != nil { + framework.Logf("Got error testing for reachability of %s: %v", url, err) + atomic.AddUint64(&networkErrors, 1) + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + framework.Logf("Got bad status code: %d", resp.StatusCode) + atomic.AddUint64(&httpErrors, 1) + return + } + body, err := io.ReadAll(resp.Body) + if err != nil { + framework.Logf("Got error reading HTTP body: %v", err) + atomic.AddUint64(&httpErrors, 1) + return + } + if string(body) != msg { + framework.Logf("The response body does not contain expected string %s", string(body)) + atomic.AddUint64(&httpErrors, 1) + return + } + }, time.Duration(0), done) + }() + + ginkgo.By("Triggering DaemonSet rolling update several times") + var previousTotalRequests uint64 = 0 + var previousNetworkErrors uint64 = 0 + var previousHttpErrors uint64 = 0 + for i := 1; i <= 5; i++ { + framework.Logf("Update daemon pods environment: [{\"name\":\"VERSION\",\"value\":\"%d\"}]", i) + patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%d"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, i) + ds, err = cs.AppsV1().DaemonSets(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + framework.ExpectNoError(err) + + framework.Logf("Check that daemon pods are available on every node of the cluster with the updated environment.") + err = wait.PollImmediate(framework.Poll, creationTimeout, func() (bool, error) { + podList, err := cs.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + pods := podList.Items + + readyPods := 0 + for _, pod := range pods { + if !metav1.IsControlledBy(&pod, ds) { + continue + } + if pod.DeletionTimestamp != nil { + continue + } + podVersion := "" + for _, env := range pod.Spec.Containers[0].Env { + if env.Name == "VERSION" { + podVersion = env.Value + break + } + } + if podVersion != fmt.Sprintf("%d", i) { + continue + } + podReady := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) + if !podReady { + continue + } + readyPods += 1 + } + framework.Logf("Number of running nodes: %d, number of updated ready pods: %d in daemonset %s", len(nodeNames), readyPods, ds.Name) + return readyPods == len(nodeNames), nil + }) + framework.ExpectNoError(err, "error waiting for daemon pods to be ready") + + // assert that the HTTP requests success rate is above the acceptable threshold after this rolling update + currentTotalRequests := atomic.LoadUint64(&totalRequests) + currentNetworkErrors := atomic.LoadUint64(&networkErrors) + currentHttpErrors := atomic.LoadUint64(&httpErrors) + + partialTotalRequests := currentTotalRequests - previousTotalRequests + partialNetworkErrors := currentNetworkErrors - previousNetworkErrors + partialHttpErrors := currentHttpErrors - previousHttpErrors + partialSuccessRate := (float64(partialTotalRequests) - float64(partialNetworkErrors+partialHttpErrors)) / float64(partialTotalRequests) + + framework.Logf("Load Balancer total HTTP requests: %d", partialTotalRequests) + framework.Logf("Network errors: %d", partialNetworkErrors) + framework.Logf("HTTP errors: %d", partialHttpErrors) + framework.Logf("Success rate: %.2f%%", partialSuccessRate*100) + if partialSuccessRate < minSuccessRate { + framework.Failf("Encountered too many errors when doing HTTP requests to the load balancer address. Success rate is %.2f%%, and the minimum allowed threshold is %.2f%%.", partialSuccessRate*100, minSuccessRate*100) + } + + previousTotalRequests = currentTotalRequests + previousNetworkErrors = currentNetworkErrors + previousHttpErrors = currentHttpErrors + } + + // assert that the load balancer address is still reachable after the rolling updates are finished + e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout) +}