Merge pull request #114052 from ionutbalutoiu/tests/lb-rolling-update

tests: Add DaemonSet with LB rolling update test
This commit is contained in:
Kubernetes Prow Robot 2022-12-12 18:47:52 -08:00 committed by GitHub
commit 73ed9e70ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,22 +19,31 @@ package network
import (
"context"
"fmt"
"io"
"math/big"
"net"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
compute "google.golang.org/api/compute/v1"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
e2eapps "k8s.io/kubernetes/test/e2e/apps"
"k8s.io/kubernetes/test/e2e/framework"
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
e2ekubesystem "k8s.io/kubernetes/test/e2e/framework/kubesystem"
e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
@ -1299,6 +1308,24 @@ var _ = common.SIGDescribe("LoadBalancers", func() {
framework.Failf("Failed to connect to backend 2")
}
})
ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Cluster [Slow]", func() {
// We start with a low but reasonable threshold to analyze the results.
// The goal is to achieve 99% minimum success rate.
// TODO: We should do incremental steps toward the goal.
minSuccessRate := 0.95
testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeCluster, minSuccessRate)
})
ginkgo.It("should not have connectivity disruption during rolling update with externalTrafficPolicy=Local [Slow]", func() {
// We start with a low but reasonable threshold to analyze the results.
// The goal is to achieve 99% minimum success rate.
// TODO: We should do incremental steps toward the goal.
minSuccessRate := 0.95
testRollingUpdateLBConnectivityDisruption(f, v1.ServiceExternalTrafficPolicyTypeLocal, minSuccessRate)
})
})
var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
@ -1718,3 +1745,177 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
}
})
})
func testRollingUpdateLBConnectivityDisruption(f *framework.Framework, externalTrafficPolicy v1.ServiceExternalTrafficPolicyType, minSuccessRate float64) {
cs := f.ClientSet
ns := f.Namespace.Name
name := "test-lb-rolling-update"
labels := map[string]string{"name": name}
gracePeriod := int64(60)
maxUnavailable := intstr.FromString("10%")
ds := e2edaemonset.NewDaemonSet(name, e2eapps.AgnhostImage, labels, nil, nil,
[]v1.ContainerPort{
{ContainerPort: 80},
},
"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod),
)
ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
Type: appsv1.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateDaemonSet{
MaxUnavailable: &maxUnavailable,
},
}
ds.Spec.Template.Labels = labels
ds.Spec.Template.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
nodeNames := e2edaemonset.SchedulableNodes(cs, ds)
e2eskipper.SkipUnlessAtLeast(len(nodeNames), 2, "load-balancer rolling update test requires at least 2 schedulable nodes for the DaemonSet")
ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", name))
ds, err := cs.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Checking that daemon pods launch on every schedulable node of the cluster")
creationTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs)
err = wait.PollImmediate(framework.Poll, creationTimeout, e2edaemonset.CheckDaemonPodOnNodes(f, ds, nodeNames))
framework.ExpectNoError(err, "error waiting for daemon pods to start")
err = e2edaemonset.CheckDaemonStatus(f, name)
framework.ExpectNoError(err)
ginkgo.By(fmt.Sprintf("Creating a service %s with type=LoadBalancer externalTrafficPolicy=%s in namespace %s", name, externalTrafficPolicy, ns))
jig := e2eservice.NewTestJig(cs, ns, name)
jig.Labels = labels
service, err := jig.CreateLoadBalancerService(creationTimeout, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = externalTrafficPolicy
})
framework.ExpectNoError(err)
lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0])
svcPort := int(service.Spec.Ports[0].Port)
ginkgo.By("Hitting the DaemonSet's pods through the service's load balancer")
timeout := e2eservice.LoadBalancerLagTimeoutDefault
if framework.ProviderIs("aws") {
timeout = e2eservice.LoadBalancerLagTimeoutAWS
}
e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout)
ginkgo.By("Starting a goroutine to continuously hit the DaemonSet's pods through the service's load balancer")
var totalRequests uint64 = 0
var networkErrors uint64 = 0
var httpErrors uint64 = 0
done := make(chan struct{})
defer close(done)
go func() {
defer ginkgo.GinkgoRecover()
wait.Until(func() {
atomic.AddUint64(&totalRequests, 1)
client := &http.Client{
Transport: utilnet.SetTransportDefaults(&http.Transport{
DisableKeepAlives: true,
}),
Timeout: 5 * time.Second,
}
ipPort := net.JoinHostPort(lbNameOrAddress, strconv.Itoa(svcPort))
msg := "hello"
url := fmt.Sprintf("http://%s/echo?msg=%s", ipPort, msg)
resp, err := client.Get(url)
if err != nil {
framework.Logf("Got error testing for reachability of %s: %v", url, err)
atomic.AddUint64(&networkErrors, 1)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
framework.Logf("Got bad status code: %d", resp.StatusCode)
atomic.AddUint64(&httpErrors, 1)
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
framework.Logf("Got error reading HTTP body: %v", err)
atomic.AddUint64(&httpErrors, 1)
return
}
if string(body) != msg {
framework.Logf("The response body does not contain expected string %s", string(body))
atomic.AddUint64(&httpErrors, 1)
return
}
}, time.Duration(0), done)
}()
ginkgo.By("Triggering DaemonSet rolling update several times")
var previousTotalRequests uint64 = 0
var previousNetworkErrors uint64 = 0
var previousHttpErrors uint64 = 0
for i := 1; i <= 5; i++ {
framework.Logf("Update daemon pods environment: [{\"name\":\"VERSION\",\"value\":\"%d\"}]", i)
patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%d"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, i)
ds, err = cs.AppsV1().DaemonSets(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
framework.ExpectNoError(err)
framework.Logf("Check that daemon pods are available on every node of the cluster with the updated environment.")
err = wait.PollImmediate(framework.Poll, creationTimeout, func() (bool, error) {
podList, err := cs.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
pods := podList.Items
readyPods := 0
for _, pod := range pods {
if !metav1.IsControlledBy(&pod, ds) {
continue
}
if pod.DeletionTimestamp != nil {
continue
}
podVersion := ""
for _, env := range pod.Spec.Containers[0].Env {
if env.Name == "VERSION" {
podVersion = env.Value
break
}
}
if podVersion != fmt.Sprintf("%d", i) {
continue
}
podReady := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
if !podReady {
continue
}
readyPods += 1
}
framework.Logf("Number of running nodes: %d, number of updated ready pods: %d in daemonset %s", len(nodeNames), readyPods, ds.Name)
return readyPods == len(nodeNames), nil
})
framework.ExpectNoError(err, "error waiting for daemon pods to be ready")
// assert that the HTTP requests success rate is above the acceptable threshold after this rolling update
currentTotalRequests := atomic.LoadUint64(&totalRequests)
currentNetworkErrors := atomic.LoadUint64(&networkErrors)
currentHttpErrors := atomic.LoadUint64(&httpErrors)
partialTotalRequests := currentTotalRequests - previousTotalRequests
partialNetworkErrors := currentNetworkErrors - previousNetworkErrors
partialHttpErrors := currentHttpErrors - previousHttpErrors
partialSuccessRate := (float64(partialTotalRequests) - float64(partialNetworkErrors+partialHttpErrors)) / float64(partialTotalRequests)
framework.Logf("Load Balancer total HTTP requests: %d", partialTotalRequests)
framework.Logf("Network errors: %d", partialNetworkErrors)
framework.Logf("HTTP errors: %d", partialHttpErrors)
framework.Logf("Success rate: %.2f%%", partialSuccessRate*100)
if partialSuccessRate < minSuccessRate {
framework.Failf("Encountered too many errors when doing HTTP requests to the load balancer address. Success rate is %.2f%%, and the minimum allowed threshold is %.2f%%.", partialSuccessRate*100, minSuccessRate*100)
}
previousTotalRequests = currentTotalRequests
previousNetworkErrors = currentNetworkErrors
previousHttpErrors = currentHttpErrors
}
// assert that the load balancer address is still reachable after the rolling updates are finished
e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout)
}