Merge pull request #125222 from danwinship/kind-loadbalancers-4

Fix "updates to ExternalTrafficPolicy" test
This commit is contained in:
Kubernetes Prow Robot 2024-09-17 14:05:14 +01:00 committed by GitHub
commit bfd91fbb3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 249 additions and 211 deletions

View File

@ -56,7 +56,6 @@ import (
"k8s.io/utils/ptr"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)
// getInternalIP returns node internal IP
@ -990,7 +989,7 @@ var _ = common.SIGDescribe("LoadBalancers", feature.LoadBalancer, func() {
var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature.LoadBalancer, framework.WithSlow(), func() {
f := framework.NewDefaultFramework("esipp")
f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
var loadBalancerCreateTimeout time.Duration
var cs clientset.Interface
@ -1151,6 +1150,53 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature
}
})
ginkgo.It("should target all nodes with endpoints", func(ctx context.Context) {
// FIXME: need a better platform-independent timeout
loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs)
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
framework.ExpectNoError(err)
if len(nodes.Items) == 1 {
e2eskipper.Skipf("Test requires multiple schedulable nodes")
}
namespace := f.Namespace.Name
serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
ginkgo.By("creating the service")
svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, false, nil)
framework.ExpectNoError(err, "creating the service")
ingress := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
svcPort := int(svc.Spec.Ports[0].Port)
framework.Logf("ingress is %s:%d", ingress, svcPort)
ginkgo.By("creating endpoints on multiple nodes")
_, err = jig.Run(ctx, func(rc *v1.ReplicationController) {
rc.Spec.Replicas = ptr.To[int32](2)
rc.Spec.Template.Spec.Affinity = &v1.Affinity{
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{MatchLabels: jig.Labels},
TopologyKey: "kubernetes.io/hostname",
},
},
},
}
})
framework.ExpectNoError(err, "creating the endpoints")
ginkgo.By("ensuring that the LoadBalancer targets all endpoints")
// We're not testing affinity here, but we can use checkAffinity(false) to
// test that affinity *isn't* enabled, which is to say, that connecting to
// ingress:svcPort multiple times eventually reaches at least 2 different
// endpoints.
if !checkAffinity(ctx, cs, nil, ingress, svcPort, false) {
framework.Failf("Load balancer connections only reached one of the two endpoints")
}
})
ginkgo.It("should work from pods", func(ctx context.Context) {
var err error
namespace := f.Namespace.Name
@ -1216,175 +1262,6 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature
framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", pausePod.Name, pausePod.Status.PodIP, srcIP)
}
})
ginkgo.It("should handle updates to ExternalTrafficPolicy field", func(ctx context.Context) {
namespace := f.Namespace.Name
serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
framework.ExpectNoError(err)
if len(nodes.Items) < 2 {
framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
}
svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil)
framework.ExpectNoError(err)
ginkgo.DeferCleanup(func(ctx context.Context) {
err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
framework.ExpectNoError(err)
err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
})
// save the health check node port because it disappears when Local traffic policy is turned off.
healthCheckNodePort := int(svc.Spec.HealthCheckNodePort)
ginkgo.By("changing ExternalTrafficPolicy to Cluster")
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
})
framework.ExpectNoError(err)
if svc.Spec.HealthCheckNodePort > 0 {
framework.Failf("Service HealthCheck NodePort still present")
}
epNodes, err := jig.ListNodesWithEndpoint(ctx)
framework.ExpectNoError(err)
// map from name of nodes with endpoint to internal ip
// it is assumed that there is only a single node with the endpoint
endpointNodeMap := make(map[string]string)
// map from name of nodes without endpoint to internal ip
noEndpointNodeMap := make(map[string]string)
for _, node := range epNodes {
ips := e2enode.GetAddresses(&node, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", node.Name)
}
endpointNodeMap[node.Name] = ips[0]
}
for _, n := range nodes.Items {
ips := e2enode.GetAddresses(&n, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", n.Name)
}
if _, ok := endpointNodeMap[n.Name]; !ok {
noEndpointNodeMap[n.Name] = ips[0]
}
}
gomega.Expect(endpointNodeMap).ToNot(gomega.BeEmpty())
gomega.Expect(noEndpointNodeMap).ToNot(gomega.BeEmpty())
svcTCPPort := int(svc.Spec.Ports[0].Port)
svcNodePort := int(svc.Spec.Ports[0].NodePort)
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
path := "/clientip"
dialCmd := "clientip"
config := e2enetwork.NewNetworkingTestConfig(ctx, f)
ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
for nodeName, nodeIP := range noEndpointNodeMap {
ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd))
_, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout)
}
for nodeName, nodeIP := range endpointNodeMap {
ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP))
var body string
pollFn := func(ctx context.Context) (bool, error) {
// we expect connection failure here, but not other errors
resp, err := config.GetResponseFromTestContainer(ctx,
"http",
"healthz",
nodeIP,
healthCheckNodePort)
if err != nil {
return false, nil
}
if len(resp.Errors) > 0 {
return true, nil
}
if len(resp.Responses) > 0 {
body = resp.Responses[0]
}
return false, nil
}
if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.TestTimeout, true, pollFn); pollErr != nil {
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after traffic policy set to Cluster. body %s",
nodeName, healthCheckNodePort, body)
}
}
// Poll till kube-proxy re-adds the MASQUERADE rule on the node.
ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP))
var clientIP string
pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, 3*e2eservice.KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
if err != nil {
return false, nil
}
// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
host, _, err := net.SplitHostPort(clientIPPort)
if err != nil {
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
return false, nil
}
ip := netutils.ParseIPSloppy(host)
if ip == nil {
framework.Logf("Invalid client IP address format: %q", host)
return false, nil
}
if subnetPrefix.Contains(ip) {
return true, nil
}
return false, nil
})
if pollErr != nil {
framework.Failf("Source IP WAS preserved with Cluster traffic policy. Got %v, expected a cluster ip.", clientIP)
}
// TODO: We need to attempt to create another service with the previously
// allocated healthcheck nodePort. If the health check nodePort has been
// freed, the new service creation will succeed, upon which we cleanup.
// If the health check nodePort has NOT been freed, the new service
// creation will fail.
ginkgo.By("setting ExternalTrafficPolicy back to Local")
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
svc.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
})
framework.ExpectNoError(err)
loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
pollErr = wait.PollUntilContextTimeout(ctx, framework.PollShortTimeout, loadBalancerPropagationTimeout, true, func(ctx context.Context) (bool, error) {
clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
if err != nil {
return false, nil
}
ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort))
// The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
host, _, err := net.SplitHostPort(clientIPPort)
if err != nil {
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
return false, nil
}
ip := netutils.ParseIPSloppy(host)
if ip == nil {
framework.Logf("Invalid client IP address format: %q", host)
return false, nil
}
if !subnetPrefix.Contains(ip) {
return true, nil
}
return false, nil
})
if pollErr != nil {
framework.Failf("Source IP (%v) is not the client IP after ExternalTrafficPolicy set back to Local, expected a public IP.", clientIP)
}
})
})
func ipToSourceRange(ip string) string {

View File

@ -333,7 +333,7 @@ func StopServeHostnameService(ctx context.Context, clientset clientset.Interface
// given expectedPods list after a sort | uniq.
func verifyServeHostnameServiceUp(ctx context.Context, c clientset.Interface, ns string, expectedPods []string, serviceIP string, servicePort int) error {
// to verify from host network
hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-up-host-exec-pod")
hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-up-host-exec-pod", nil)
// to verify from container's network
execPod := e2epod.CreateExecPodOrFail(ctx, c, ns, "verify-service-up-exec-pod-", nil)
@ -403,7 +403,7 @@ func verifyServeHostnameServiceUp(ctx context.Context, c clientset.Interface, ns
// verifyServeHostnameServiceDown verifies that the given service isn't served.
func verifyServeHostnameServiceDown(ctx context.Context, c clientset.Interface, ns string, serviceIP string, servicePort int) error {
// verify from host network
hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-down-host-exec-pod")
hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-down-host-exec-pod", nil)
defer func() {
e2epod.DeletePodOrFail(ctx, c, ns, hostExecPod.Name)
}()
@ -1706,7 +1706,7 @@ var _ = common.SIGDescribe("Services", func() {
err = t.DeleteService(serviceName)
framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "hostexec")
hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "hostexec", nil)
cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort)
var stdout string
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
@ -2705,51 +2705,31 @@ var _ = common.SIGDescribe("Services", func() {
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
ginkgo.By("creating the service")
svc, err := jig.CreateOnlyLocalNodePortService(ctx, false)
svc, err := jig.CreateOnlyLocalNodePortService(ctx, true)
framework.ExpectNoError(err, "creating the service")
tcpNodePort := int(svc.Spec.Ports[0].NodePort)
nodePortStr := fmt.Sprintf("%d", tcpNodePort)
framework.Logf("NodePort is %s", nodePortStr)
ginkgo.By("creating a HostNetwork exec pod")
execPod := launchHostExecPod(ctx, cs, namespace, "hostexec")
execPod, err = cs.CoreV1().Pods(namespace).Get(ctx, execPod.Name, metav1.GetOptions{})
framework.ExpectNoError(err, "getting podIP of execPod")
framework.Logf("execPod IP is %q", execPod.Status.PodIP)
ginkgo.By("creating an endpoint for the service on a different node from the execPod")
_, err = jig.Run(ctx, func(rc *v1.ReplicationController) {
rc.Spec.Template.Spec.Affinity = &v1.Affinity{
// We need to ensure the endpoint is on a different node
// from the exec pod, to ensure that the source IP of the
// traffic is the node's "public" IP. For
// node-to-pod-on-same-node traffic, it might end up using
// the "docker0" IP or something like that.
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{{
MatchFields: []v1.NodeSelectorRequirement{{
Key: "metadata.name",
Operator: "NotIn",
Values: []string{execPod.Spec.NodeName},
}},
}},
},
},
}
})
framework.ExpectNoError(err, "creating the endpoint pod")
// Extract the single endpoint node IP from a map of endpoint node IPs
var endpointNodeIP string
// Get the (single) endpoint's node name and IP
var endpointNodeName, endpointNodeIP string
endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig)
framework.ExpectNoError(err, "fetching endpoint node IPs")
for node, nodeIP := range endpointsNodeMap {
framework.Logf("endpoint is on node %s (%s)", node, nodeIP)
endpointNodeName = node
endpointNodeIP = nodeIP
break
}
// We need to ensure the endpoint is on a different node from the exec
// pod, to ensure that the source IP of the traffic is the node's "public"
// IP. For node-to-pod-on-same-node traffic, it might end up using the
// "docker0" IP or something like that.
ginkgo.By("creating a HostNetwork exec pod on a different node")
execPod := launchHostExecPod(ctx, cs, namespace, "hostexec", &endpointNodeName)
framework.Logf("execPod IP is %q", execPod.Status.PodIP)
ginkgo.By("connecting from the execpod to the NodePort on the endpoint's node")
cmd := fmt.Sprintf("curl -g -q -s --connect-timeout 3 http://%s/clientip", net.JoinHostPort(endpointNodeIP, nodePortStr))
var clientIP string
@ -3934,6 +3914,168 @@ var _ = common.SIGDescribe("Services", func() {
framework.Failf("The state of the sctp module has changed due to the test case")
}
})
ginkgo.It("should implement NodePort and HealthCheckNodePort correctly when ExternalTrafficPolicy changes", func(ctx context.Context) {
// This uses a LoadBalancer service but only tests the parts of it that are
// managed by the service proxy; there is another test in loadbalancer.go
// to test the load balancer parts.
namespace := f.Namespace.Name
serviceName := "external-local-update"
jig := e2eservice.NewTestJig(cs, namespace, serviceName)
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 3)
framework.ExpectNoError(err, "listing nodes")
if len(nodes.Items) != 3 {
e2eskipper.Skipf("Need at least 3 schedulable nodes for this test")
}
ginkgo.By("creating the service")
svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
})
framework.ExpectNoError(err, "creating the service")
nodePortStr := fmt.Sprintf("%d", svc.Spec.Ports[0].NodePort)
hcNodePortStr := fmt.Sprintf("%d", svc.Spec.HealthCheckNodePort)
framework.Logf("NodePort is %s, HealthCheckNodePort is %s", nodePortStr, hcNodePortStr)
ginkgo.By("creating an endpoint for the service")
_, err = jig.Run(ctx, nil)
framework.ExpectNoError(err, "creating the endpoint")
var endpointNode, endpointNodeIP string
endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig)
framework.ExpectNoError(err, "fetching endpoint node IP")
for node, nodeIP := range endpointsNodeMap {
framework.Logf("endpoint is on node %s (%s)", node, nodeIP)
endpointNode = node
endpointNodeIP = nodeIP
break
}
ginkgo.By("creating a HostNetwork exec pod on a different node from the endpoint")
execPod := launchHostExecPod(ctx, cs, namespace, "hostexec", &endpointNode)
hostExecPodNodeIP := execPod.Status.HostIP
ginkgo.By("finding a third node with neither the endpoint nor the hostexec pod")
var thirdNodeIP string
for i := range nodes.Items {
node := &nodes.Items[i]
ips := e2enode.GetAddresses(node, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", node.Name)
}
if ips[0] != endpointNodeIP && ips[0] != hostExecPodNodeIP {
thirdNodeIP = ips[0]
break
}
}
gomega.Expect(thirdNodeIP).NotTo(gomega.Equal(""), "cluster has at least 3 nodes but could not find a third node IP?")
checkOneNodePort := func(nodeIP string, expectResponse bool, trafficPolicy v1.ServiceExternalTrafficPolicy, deadline time.Time) {
cmd := fmt.Sprintf("curl -g -s --connect-timeout 3 http://%s/clientip", net.JoinHostPort(nodeIP, nodePortStr))
var clientIP string
err := wait.PollUntilContextTimeout(ctx, framework.Poll, time.Until(deadline), true, func(ctx context.Context) (bool, error) {
out, err := e2eoutput.RunHostCmd(namespace, execPod.Name, cmd)
if !expectResponse {
return err != nil, nil
} else if err != nil {
return false, nil
}
clientIP, _, err = net.SplitHostPort(out)
if err != nil {
return false, fmt.Errorf("could not parse clientip response %q: %w", out, err)
}
return true, nil
})
framework.ExpectNoError(err, "expected correct response within timeout")
if expectResponse && trafficPolicy == v1.ServiceExternalTrafficPolicyLocal {
gomega.Expect(clientIP).To(gomega.Equal(hostExecPodNodeIP), "expected client IP to be preserved")
}
}
checkOneHealthCheck := func(nodeIP string, expectResponse bool, expectStatus string, deadline time.Time) {
// "-i" means to return the HTTP headers in the response
cmd := fmt.Sprintf("curl -g -i -s --connect-timeout 3 http://%s/", net.JoinHostPort(nodeIP, hcNodePortStr))
err := wait.PollUntilContextTimeout(ctx, framework.Poll, time.Until(deadline), true, func(ctx context.Context) (bool, error) {
out, err := e2eoutput.RunHostCmd(namespace, execPod.Name, cmd)
if !expectResponse {
return err != nil, nil
} else if err != nil {
return false, nil
}
status := strings.Split(out, "\n")[0]
gotResponse := strings.Contains(status, expectStatus)
return gotResponse, nil
})
framework.ExpectNoError(err, "expected response containing %s", expectStatus)
}
// (In each round of checks, all nodes must be correct within a single
// KubeProxyEndpointLagTimeout; we don't want to allow a full
// KubeProxyEndpointLagTimeout interval for each checkOneNodePort call.)
deadline := time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
ginkgo.By("ensuring that the HealthCheckNodePort reports healthy on the endpoint node when ExternalTrafficPolicy is Local")
checkOneHealthCheck(endpointNodeIP, true, "200 OK", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort reports UN-healthy on the execpod node when ExternalTrafficPolicy is Local")
checkOneHealthCheck(hostExecPodNodeIP, true, "503 Service Unavailable", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort reports UN-healthy on the third node when ExternalTrafficPolicy is Local")
checkOneHealthCheck(thirdNodeIP, true, "503 Service Unavailable", deadline)
ginkgo.By("ensuring that the NodePort responds on the endpoint node when ExternalTrafficPolicy is Local")
checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
ginkgo.By("ensuring that the NodePort responds (due to short-circuit rule) on the execpod node when ExternalTrafficPolicy is Local")
checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
ginkgo.By("ensuring that the NodePort does not respond on the third node when ExternalTrafficPolicy is Local")
checkOneNodePort(thirdNodeIP, false, v1.ServiceExternalTrafficPolicyLocal, deadline)
ginkgo.By("changing ExternalTrafficPolicy to Cluster")
oldHealthCheckNodePort := svc.Spec.HealthCheckNodePort
svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster
})
framework.ExpectNoError(err, "updating Service")
if svc.Spec.HealthCheckNodePort > 0 {
framework.Failf("Service HealthCheck NodePort still present")
}
deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
// FIXME: this is racy; we need to use a reserved HCNP here.
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the endpoint node when ExternalTrafficPolicy is Cluster")
checkOneHealthCheck(endpointNodeIP, false, "", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the execpod node when ExternalTrafficPolicy is Cluster")
checkOneHealthCheck(hostExecPodNodeIP, false, "", deadline)
ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the third node when ExternalTrafficPolicy is Cluster")
checkOneHealthCheck(thirdNodeIP, false, "", deadline)
ginkgo.By("ensuring that the NodePort responds on the endpoint node when ExternalTrafficPolicy is Cluster")
checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline)
ginkgo.By("ensuring that the NodePort responds on the execpod node when ExternalTrafficPolicy is Cluster")
checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline)
ginkgo.By("ensuring that the NodePort responds on the third node when ExternalTrafficPolicy is Cluster")
checkOneNodePort(thirdNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline)
ginkgo.By("changing ExternalTrafficPolicy back to Local")
_, err = jig.UpdateService(ctx, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
// Request the same healthCheckNodePort as before, to test the user-requested allocation path
// FIXME: we need to use a reserved HCNP here.
svc.Spec.HealthCheckNodePort = oldHealthCheckNodePort
})
framework.ExpectNoError(err, "updating ExternalTrafficPolicy and HealthCheckNodePort")
deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout)
ginkgo.By("ensuring that all NodePorts and HealthCheckNodePorts show the correct behavior again after changing ExternalTrafficPolicy back to Local")
checkOneHealthCheck(endpointNodeIP, true, "200 OK", deadline)
checkOneHealthCheck(hostExecPodNodeIP, true, "503 Service Unavailable", deadline)
checkOneHealthCheck(thirdNodeIP, true, "503 Service Unavailable", deadline)
checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline)
checkOneNodePort(thirdNodeIP, false, v1.ServiceExternalTrafficPolicyLocal, deadline)
})
})
// execAffinityTestForSessionAffinityTimeout is a helper function that wrap the logic of
@ -4184,14 +4326,33 @@ func createPodOrFail(ctx context.Context, f *framework.Framework, ns, name strin
}
// launchHostExecPod launches a hostexec pod in the given namespace and waits
// until it's Running
func launchHostExecPod(ctx context.Context, client clientset.Interface, ns, name string) *v1.Pod {
// until it's Running. If avoidNode is non-nil, it will ensure that the pod doesn't
// land on that node.
func launchHostExecPod(ctx context.Context, client clientset.Interface, ns, name string, avoidNode *string) *v1.Pod {
framework.Logf("Creating new host exec pod")
hostExecPod := e2epod.NewExecPodSpec(ns, name, true)
pod, err := client.CoreV1().Pods(ns).Create(ctx, hostExecPod, metav1.CreateOptions{})
framework.ExpectNoError(err)
if avoidNode != nil {
hostExecPod.Spec.Affinity = &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{{
MatchFields: []v1.NodeSelectorRequirement{{
Key: "metadata.name",
Operator: "NotIn",
Values: []string{*avoidNode},
}},
}},
},
},
}
}
_, err := client.CoreV1().Pods(ns).Create(ctx, hostExecPod, metav1.CreateOptions{})
framework.ExpectNoError(err, "creating host exec pod")
err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, client, name, ns, framework.PodStartTimeout)
framework.ExpectNoError(err)
framework.ExpectNoError(err, "waiting for host exec pod")
// re-fetch to get PodIP, etc
pod, err := client.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
framework.ExpectNoError(err, "getting podIP of host exec pod")
return pod
}