Refactor "updates to ExternalTrafficPolicy" test

The LoadBalancer test "should handle updates to ExternalTrafficPolicy
field" had a bunch of problems, but the biggest is that (without doing
[Disruptive] things to the cluster or making unwarranted assumptions
about source IPs) it's very hard to *prove* that the cloud load
balancer is doing Cluster traffic policy semantics (distributing
connections to all nodes) rather than Local (distributing only to
nodes with endpoints).

So split the test into 2 new tests with more focused semantics:

  - "should implement NodePort and HealthCheckNodePort correctly when
    ExternalTrafficPolicy changes" (in service.go) tests that the
    service proxy is correctly implementing the proxy side of
    Cluster-vs-Local traffic policy for LoadBalancer Services, without
    testing the cloud load balancer itself at all.

  - "should target all nodes with endpoints" (in loadbalancer.go)
    complements the existing "should only target nodes with
    endpoints", to ensure that when a service has
    `externalTrafficPolicy: Local`, and there are endpoints on
    multiple nodes, that the cloud is correctly configured to target
    all of those endpoints, not just one.
This commit is contained in:
Dan Winship 2024-05-09 15:53:59 -04:00
parent 8e8bef66f6
commit a6b08a8ea4
2 changed files with 210 additions and 171 deletions

View File

@ -56,7 +56,6 @@ import (
"k8s.io/utils/ptr"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)
// getInternalIP returns node internal IP
@ -990,7 +989,7 @@ var _ = common.SIGDescribe("LoadBalancers", feature.LoadBalancer, func() {
var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature.LoadBalancer, framework.WithSlow(), func() {
f := framework.NewDefaultFramework("esipp")
f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
var loadBalancerCreateTimeout time.Duration
var cs clientset.Interface
@ -1151,6 +1150,53 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature
}
})
ginkgo.It("should target all nodes with endpoints", func(ctx context.Context) {
// FIXME: need a better platform-independent timeout
loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
framework.ExpectNoError(err)
if len(nodes.Items) == 1 {
e2eskipper.Skipf("Test requires multiple schedulable nodes")
}
namespace := f.Namespace.Name
serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
ginkgo.By("creating the service")
svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, false, nil)
framework.ExpectNoError(err, "creating the service")
ingress := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
svcPort := int(svc.Spec.Ports[0].Port)
framework.Logf("ingress is %s:%d", ingress, svcPort)
ginkgo.By("creating endpoints on multiple nodes")
_, err = jig.Run(ctx, func(rc *v1.ReplicationController) {
rc.Spec.Replicas = ptr.To[int32](2)
rc.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{MatchLabels: jig.Labels},
TopologyKey: "kubernetes.io/hostname",
},
},
},
}
})
framework.ExpectNoError(err, "creating the endpoints")
ginkgo.By("ensuring that the LoadBalancer targets all endpoints")
// We're not testing affinity here, but we can use checkAffinity(false) to
// test that affinity *isn't* enabled, which is to say, that connecting to
// ingress:svcPort multiple times eventually reaches at least 2 different
// endpoints.
if !checkAffinity(ctx, cs, nil, ingress, svcPort, false) {
framework.Failf("Load balancer connections only reached one of the two endpoints")
}
})
ginkgo.It("should work from pods", func(ctx context.Context) {
var err error
namespace := f.Namespace.Name
@ -1216,175 +1262,6 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature
framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", pausePod.Name, pausePod.Status.PodIP, srcIP)
}
})
ginkgo.It("should handle updates to ExternalTrafficPolicy field", func(ctx context.Context) {
namespace := f.Namespace.Name
serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
if len(nodes.Items) < 2 {
framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
}
svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
framework.ExpectNoError(err)
ginkgo.DeferCleanup(func(ctx context.Context) {
err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
framework.ExpectNoError(err)
err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
})
// save the health check node port because it disappears when Local traffic policy is turned off.
healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
ginkgo.By("changing ExternalTrafficPolicy to Cluster")
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
})
framework.ExpectNoError(err)
if svc.Spec.HealthCheckNodePort > 0 {
framework.Failf("Service HealthCheck NodePort still present")
}
epNodes, err := jig.ListNodesWithEndpoint(ctx)
framework.ExpectNoError(err)
// map from name of nodes with endpoint to internal ip
// it is assumed that there is only a single node with the endpoint
endpointNodeMap := make(map[string]string)
// map from name of nodes without endpoint to internal ip
noEndpointNodeMap := make(map[string]string)
for _, node := range epNodes {
ips := e2enode.GetAddresses(&node, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", node.Name)
}
endpointNodeMap[node.Name] = ips[0]
}
for _, n := range nodes.Items {
ips := e2enode.GetAddresses(&n, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", n.Name)
}
if _, ok := endpointNodeMap[n.Name]; !ok {
noEndpointNodeMap[n.Name] = ips[0]
}
}
gomega.Expect(endpointNodeMap).ToNot(gomega.BeEmpty())
gomega.Expect(noEndpointNodeMap).ToNot(gomega.BeEmpty())
svcTCPPort := int(svc.Spec.Ports[0].Port)
svcNodePort := int(svc.Spec.Ports[0].NodePort)
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
path := "/clientip"
dialCmd := "clientip"
config := e2enetwork.NewNetworkingTestConfig(ctx, f)
ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
for nodeName, nodeIP := range noEndpointNodeMap {
ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd))
_, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout)
}
for nodeName, nodeIP := range endpointNodeMap {
ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP))
var body string
pollFn := func(ctx context.Context) (bool, error) {
// we expect connection failure here, but not other errors
resp, err := config.GetResponseFromTestContainer(ctx,
"http",
"healthz",
nodeIP,
healthCheckNodePort)
if err != nil {
return false, nil
}
if len(resp.Errors) > 0 {
return true, nil
}
if len(resp.Responses) > 0 {
body = resp.Responses[0]
}
return false, nil
}
if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.TestTimeout, true, pollFn); pollErr != nil {
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after traffic policy set to Cluster. body %s",
nodeName, healthCheckNodePort, body)
}
}
// Poll till kube-proxy re-adds the MASQUERADE rule on the node.
ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP))
var clientIP string
pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, 3*e2eservice.KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
if err != nil {
return false, nil
}
// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
host, _, err := net.SplitHostPort(clientIPPort)
if err != nil {
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
return false, nil
}
ip := netutils.ParseIPSloppy(host)
if ip == nil {
framework.Logf("Invalid client IP address format: %q", host)
return false, nil
}
if subnetPrefix.Contains(ip) {
return true, nil
}
return false, nil
})
if pollErr != nil {
framework.Failf("Source IP WAS preserved with Cluster traffic policy. Got %v, expected a cluster ip.", clientIP)
}
// TODO: We need to attempt to create another service with the previously
// allocated healthcheck nodePort. If the health check nodePort has been
// freed, the new service creation will succeed, upon which we cleanup.
// If the health check nodePort has NOT been freed, the new service
// creation will fail.
ginkgo.By("setting ExternalTrafficPolicy back to Local")
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
svc.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
})
framework.ExpectNoError(err)
loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
pollErr = wait.PollUntilContextTimeout(ctx, framework.PollShortTimeout, loadBalancerPropagationTimeout, true, func(ctx context.Context) (bool, error) {
clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
if err != nil {
return false, nil
}
ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort))
// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
host, _, err := net.SplitHostPort(clientIPPort)
if err != nil {
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
return false, nil
}
ip := netutils.ParseIPSloppy(host)
if ip == nil {
framework.Logf("Invalid client IP address format: %q", host)
return false, nil
}
if !subnetPrefix.Contains(ip) {
return true, nil
}
return false, nil
})
if pollErr != nil {
framework.Failf("Source IP (%v) is not the client IP after ExternalTrafficPolicy set back to Local, expected a public IP.", clientIP)
}
})
})
func ipToSourceRange(ip string) string {

View File

@ -3914,6 +3914,168 @@ var _ = common.SIGDescribe("Services", func() {
framework.Failf("The state of the sctp module has changed due to the test case")
}
})
ginkgo.It("should implement NodePort and HealthCheckNodePort correctly when ExternalTrafficPolicy changes", func(ctx context.Context) {
// This uses a LoadBalancer service but only tests the parts of it that are
// managed by the service proxy; there is another test in loadbalancer.go
// to test the load balancer parts.
namespace := f.Namespace.Name
serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 3)
framework.ExpectNoError(err, "listing nodes")
if len(nodes.Items) != 3 {
e2eskipper.Skipf("Need at least 3 schedulable nodes for this test")
}
ginkgo.By("creating the service")
svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
})
framework.ExpectNoError(err, "creating the service")
nodePortStr := fmt.Sprintf("%d", svc.Spec.Ports[0].NodePort)
hcNodePortStr := fmt.Sprintf("%d", svc.Spec.HealthCheckNodePort)
framework.Logf("NodePort is %s, HealthCheckNodePort is %s", nodePortStr, hcNodePortStr)
ginkgo.By("creating an endpoint for the service")
_, err = jig.Run(ctx, nil)
framework.ExpectNoError(err, "creating the endpoint")
var endpointNode, endpointNodeIP string
endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig)
framework.ExpectNoError(err, "fetching endpoint node IP")
for node, nodeIP := range endpointsNodeMap {
framework.Logf("endpoint is on node %s (%s)", node, nodeIP)
endpointNode = node
endpointNodeIP = nodeIP
break
}
ginkgo.By("creating a HostNetwork exec pod on a different node from the endpoint")
execPod := launchHostExecPod(ctx, cs, namespace, "hostexec", &endpointNode)
hostExecPodNodeIP := execPod.Status.HostIP
ginkgo.By("finding a third node with neither the endpoint nor the hostexec pod")
var thirdNodeIP string
for i := range nodes.Items {
node := &nodes.Items[i]
ips := e2enode.GetAddresses(node, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", node.Name)
}
if ips[0] != endpointNodeIP && ips[0] != hostExecPodNodeIP {
thirdNodeIP = ips[0]
break
}
}
gomega.Expect(thirdNodeIP).NotTo(gomega.Equal(""), "cluster has at least 3 nodes but could not find a third node IP?")
checkOneNodePort := func(nodeIP string, expectResponse bool, trafficPolicy v1.ServiceExternalTrafficPolicy, deadline time.Time) {
cmd := fmt.Sprintf("curl -g -s --connect-timeout 3 http://%s/clientip", net.JoinHostPort(nodeIP, nodePortStr))
var clientIP string
err := wait.PollUntilContextTimeout(ctx, framework.Poll, time.Until(deadline), true, func(ctx context.Context) (bool, error) {
out, err := e2eoutput.RunHostCmd(namespace, execPod.Name, cmd)
if !expectResponse {
return err != nil, nil
} else if err != nil {
return false, nil
}
clientIP, _, err = net.SplitHostPort(out)
if err != nil {
return false, fmt.Errorf("could not parse clientip response %q: %w", out, err)
}
return true, nil
})
framework.ExpectNoError(err, "expected correct response within timeout")
if expectResponse && trafficPolicy == v1.ServiceExternalTrafficPolicyLocal {
gomega.Expect(clientIP).To(gomega.Equal(hostExecPodNodeIP), "expected client IP to be preserved")
}
}
checkOneHealthCheck := func(nodeIP string, expectResponse bool, expectStatus string, deadline time.Time) {
// "-i" means to return the HTTP headers in the response
cmd := fmt.Sprintf("curl -g -i -s --connect-timeout 3 http://%s/", net.JoinHostPort(nodeIP, hcNodePortStr))
err := wait.PollUntilContextTimeout(ctx, framework.Poll, time.Until(deadline), true, func(ctx context.Context) (bool, error) {
out, err := e2eoutput.RunHostCmd(namespace, execPod.Name, cmd)
if !expectResponse {
return err != nil, nil
} else if err != nil {
return false, nil
}
status := strings.Split(out, "\n")[0]
gotResponse := strings.Contains(status, expectStatus)
return gotResponse, nil
})
framework.ExpectNoError(err, "expected response containing %s", expectStatus)
}
// (In each round of checks, all nodes must be correct within a single
// KubeProxyEndpointLagTimeout; we don't want to allow a full
// KubeProxyEndpointLagTimeout interval for each checkOneNodePort call.)
deadline := time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
ginkgo.By("ensuring that the HealthCheckNodePort reports healthy on the endpoint node when ExternalTrafficPolicy is Local")
checkOneHealthCheck(endpointNodeIP, true, "200 OK", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort reports UN-healthy on the execpod node when ExternalTrafficPolicy is Local")
checkOneHealthCheck(hostExecPodNodeIP, true, "503 Service Unavailable", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort reports UN-healthy on the third node when ExternalTrafficPolicy is Local")
checkOneHealthCheck(thirdNodeIP, true, "503 Service Unavailable", deadline)
ginkgo.By("ensuring that the NodePort responds on the endpoint node when ExternalTrafficPolicy is Local")
checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
ginkgo.By("ensuring that the NodePort responds (due to short-circuit rule) on the execpod node when ExternalTrafficPolicy is Local")
checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
ginkgo.By("ensuring that the NodePort does not respond on the third node when ExternalTrafficPolicy is Local")
checkOneNodePort(thirdNodeIP, false, v1.ServiceExternalTrafficPolicyLocal, deadline)
ginkgo.By("changing ExternalTrafficPolicy to Cluster")
oldHealthCheckNodePort := svc.Spec.HealthCheckNodePort
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
})
framework.ExpectNoError(err, "updating Service")
if svc.Spec.HealthCheckNodePort > 0 {
framework.Failf("Service HealthCheck NodePort still present")
}
deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
// FIXME: this is racy; we need to use a reserved HCNP here.
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the endpoint node when ExternalTrafficPolicy is Cluster")
checkOneHealthCheck(endpointNodeIP, false, "", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the execpod node when ExternalTrafficPolicy is Cluster")
checkOneHealthCheck(hostExecPodNodeIP, false, "", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the third node when ExternalTrafficPolicy is Cluster")
checkOneHealthCheck(thirdNodeIP, false, "", deadline)
ginkgo.By("ensuring that the NodePort responds on the endpoint node when ExternalTrafficPolicy is Cluster")
checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline)
ginkgo.By("ensuring that the NodePort responds on the execpod node when ExternalTrafficPolicy is Cluster")
checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline)
ginkgo.By("ensuring that the NodePort responds on the third node when ExternalTrafficPolicy is Cluster")
checkOneNodePort(thirdNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline)
ginkgo.By("changing ExternalTrafficPolicy back to Local")
_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
// FIXME: we need to use a reserved HCNP here.
svc.Spec.HealthCheckNodePort = oldHealthCheckNodePort
})
framework.ExpectNoError(err, "updating ExternalTrafficPolicy and HealthCheckNodePort")
deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
ginkgo.By("ensuring that all NodePorts and HealthCheckNodePorts show the correct behavior again after changing ExternalTrafficPolicy back to Local")
checkOneHealthCheck(endpointNodeIP, true, "200 OK", deadline)
checkOneHealthCheck(hostExecPodNodeIP, true, "503 Service Unavailable", deadline)
checkOneHealthCheck(thirdNodeIP, true, "503 Service Unavailable", deadline)
checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
checkOneNodePort(thirdNodeIP, false, v1.ServiceExternalTrafficPolicyLocal, deadline)
})
})
// execAffinityTestForSessionAffinityTimeout is a helper function that wrap the logic of