tests: Add DaemonSet with LB rolling update test

Add a test case with a DaemonSet behind a simple load balancer whose
address is being constantly hit via HTTP requests.

The test passes if there are no errors when doing HTTP requests to the
load balancer address, during DaemonSet `RollingUpdate` operations.

Signed-off-by: Ionut Balutoiu <ibalutoiu@cloudbasesolutions.com>
This commit is contained in:
Ionut Balutoiu 2022-11-21 21:44:14 +02:00
parent 0e19bbb916
commit 3feea9db7c

View File

@ -19,22 +19,31 @@ package network
import (
"context"
"fmt"
"io"
"math/big"
"net"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
compute "google.golang.org/api/compute/v1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
e2eapps "k8s.io/kubernetes/test/e2e/apps"
"k8s.io/kubernetes/test/e2e/framework"
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
e2ekubesystem "k8s.io/kubernetes/test/e2e/framework/kubesystem"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
@ -1299,6 +1308,24 @@ var _ = common.SIGDescribe("LoadBalancers", func() {
framework.Failf("Failed to connect to backend 2")
}
})
ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Cluster [Slow]", func() {
// We start with a low but reasonable threshold to analyze the results.
// The goal is to achieve 99% minimum success rate.
// TODO: We should do incremental steps toward the goal.
minSuccessRate := 0.95
testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeCluster, minSuccessRate)
})
ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Local [Slow]", func() {
// We start with a low but reasonable threshold to analyze the results.
// The goal is to achieve 99% minimum success rate.
// TODO: We should do incremental steps toward the goal.
minSuccessRate := 0.95
testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeLocal, minSuccessRate)
})
})
var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
@ -1718,3 +1745,177 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
}
})
})
func testRollingUpdateLBConnectivityDisruption(f *framework.Framework, externalTrafficPolicy v1.ServiceExternalTrafficPolicyType, minSuccessRate float64) {
cs := f.ClientSet
ns := f.Namespace.Name
name := "test-lb-rolling-update"
labels := map[string]string{"name": name}
gracePeriod := int64(60)
maxUnavailable := intstr.FromString("10%")
ds := e2edaemonset.NewDaemonSet(name, e2eapps.AgnhostImage, labels, nil, nil,
[]v1.ContainerPort{
{ContainerPort: 80},
},
"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod),
)
ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
Type: appsv1.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateDaemonSet{
MaxUnavailable: &maxUnavailable,
},
}
ds.Spec.Template.Labels = labels
ds.Spec.Template.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
nodeNames := e2edaemonset.SchedulableNodes(cs, ds)
e2eskipper.SkipUnlessAtLeast(len(nodeNames), 2, "load-balancer rolling update test requires at least 2 schedulable nodes for the DaemonSet")
ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", name))
ds, err := cs.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Checking that daemon pods launch on every schedulable node of the cluster")
creationTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs)
err = wait.PollImmediate(framework.Poll, creationTimeout, e2edaemonset.CheckDaemonPodOnNodes(f, ds, nodeNames))
framework.ExpectNoError(err, "error waiting for daemon pods to start")
err = e2edaemonset.CheckDaemonStatus(f, name)
framework.ExpectNoError(err)
ginkgo.By(fmt.Sprintf("Creating a service %s with type=LoadBalancer externalTrafficPolicy=%s in namespace %s", name, externalTrafficPolicy, ns))
jig := e2eservice.NewTestJig(cs, ns, name)
jig.Labels = labels
service, err := jig.CreateLoadBalancerService(creationTimeout, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = externalTrafficPolicy
})
framework.ExpectNoError(err)
lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0])
svcPort := int(service.Spec.Ports[0].Port)
ginkgo.By("Hitting the DaemonSet's pods through the service's load balancer")
timeout := e2eservice.LoadBalancerLagTimeoutDefault
if framework.ProviderIs("aws") {
timeout = e2eservice.LoadBalancerLagTimeoutAWS
}
e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout)
ginkgo.By("Starting a goroutine to continuously hit the DaemonSet's pods through the service's load balancer")
var totalRequests uint64 = 0
var networkErrors uint64 = 0
var httpErrors uint64 = 0
done := make(chan struct{})
defer close(done)
go func() {
defer ginkgo.GinkgoRecover()
wait.Until(func() {
atomic.AddUint64(&totalRequests, 1)
client := &http.Client{
Transport: utilnet.SetTransportDefaults(&http.Transport{
DisableKeepAlives: true,
}),
Timeout: 5 * time.Second,
}
ipPort := net.JoinHostPort(lbNameOrAddress, strconv.Itoa(svcPort))
msg := "hello"
url := fmt.Sprintf("http://%s/echo?msg=%s", ipPort, msg)
resp, err := client.Get(url)
if err != nil {
framework.Logf("Got error testing for reachability of %s: %v", url, err)
atomic.AddUint64(&networkErrors, 1)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
framework.Logf("Got bad status code: %d", resp.StatusCode)
atomic.AddUint64(&httpErrors, 1)
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
framework.Logf("Got error reading HTTP body: %v", err)
atomic.AddUint64(&httpErrors, 1)
return
}
if string(body) != msg {
framework.Logf("The response body does not contain expected string %s", string(body))
atomic.AddUint64(&httpErrors, 1)
return
}
}, time.Duration(0), done)
}()
ginkgo.By("Triggering DaemonSet rolling update several times")
var previousTotalRequests uint64 = 0
var previousNetworkErrors uint64 = 0
var previousHttpErrors uint64 = 0
for i := 1; i <= 5; i++ {
framework.Logf("Update daemon pods environment: [{\"name\":\"VERSION\",\"value\":\"%d\"}]", i)
patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%d"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, i)
ds, err = cs.AppsV1().DaemonSets(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
framework.ExpectNoError(err)
framework.Logf("Check that daemon pods are available on every node of the cluster with the updated environment.")
err = wait.PollImmediate(framework.Poll, creationTimeout, func() (bool, error) {
podList, err := cs.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
pods := podList.Items
readyPods := 0
for _, pod := range pods {
if !metav1.IsControlledBy(&pod, ds) {
continue
}
if pod.DeletionTimestamp != nil {
continue
}
podVersion := ""
for _, env := range pod.Spec.Containers[0].Env {
if env.Name == "VERSION" {
podVersion = env.Value
break
}
}
if podVersion != fmt.Sprintf("%d", i) {
continue
}
podReady := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
if !podReady {
continue
}
readyPods += 1
}
framework.Logf("Number of running nodes: %d, number of updated ready pods: %d in daemonset %s", len(nodeNames), readyPods, ds.Name)
return readyPods == len(nodeNames), nil
})
framework.ExpectNoError(err, "error waiting for daemon pods to be ready")
// assert that the HTTP requests success rate is above the acceptable threshold after this rolling update
currentTotalRequests := atomic.LoadUint64(&totalRequests)
currentNetworkErrors := atomic.LoadUint64(&networkErrors)
currentHttpErrors := atomic.LoadUint64(&httpErrors)
partialTotalRequests := currentTotalRequests - previousTotalRequests
partialNetworkErrors := currentNetworkErrors - previousNetworkErrors
partialHttpErrors := currentHttpErrors - previousHttpErrors
partialSuccessRate := (float64(partialTotalRequests) - float64(partialNetworkErrors+partialHttpErrors)) / float64(partialTotalRequests)
framework.Logf("Load Balancer total HTTP requests: %d", partialTotalRequests)
framework.Logf("Network errors: %d", partialNetworkErrors)
framework.Logf("HTTP errors: %d", partialHttpErrors)
framework.Logf("Success rate: %.2f%%", partialSuccessRate*100)
if partialSuccessRate < minSuccessRate {
framework.Failf("Encountered too many errors when doing HTTP requests to the load balancer address. Success rate is %.2f%%, and the minimum allowed threshold is %.2f%%.", partialSuccessRate*100, minSuccessRate*100)
}
previousTotalRequests = currentTotalRequests
previousNetworkErrors = currentNetworkErrors
previousHttpErrors = currentHttpErrors
}
// assert that the load balancer address is still reachable after the rolling updates are finished
e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout)
}