diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index fffef5398a2..b9c80cdd199 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -55,6 +55,7 @@ import ( netutils "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" @@ -1799,6 +1800,264 @@ var _ = common.SIGDescribe("Services", func() { } }) + ginkgo.It("should be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is true", func() { + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-tolerate-unready" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.PublishNotReadyAddresses = true + }) + framework.ExpectNoError(err, "failed to create Service") + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + gracePeriod := int64(300) + webserverPod0 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "webserver-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "agnhost", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)}, + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + } + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err) + } + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 1 pause pods that will try to connect to the webservers") + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err) + } + + // webserver should continue to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is unready but PublishNotReadyAddresses is true + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // Wait until the pod becomes unready + err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { + return !podutil.IsPodReady(pod), nil + }) + if err != nil { + framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err) + } + // assert 5 times that the pause pod can connect to the Service + nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP) + clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + // connect 3 times every 5 seconds to the Service with the unready and terminating endpoint + for i := 0; i < 5; i++ { + execHostnameTest(*pausePod1, clusterIPAddress, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name) + time.Sleep(5 * time.Second) + } + }) + + ginkgo.It("should not be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is false", func() { + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-not-tolerate-unready" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.PublishNotReadyAddresses = false + }) + framework.ExpectNoError(err, "failed to create Service") + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + gracePeriod := int64(300) + webserverPod0 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "webserver-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "agnhost", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)}, + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + } + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err) + } + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 1 pause pods that will try to connect to the webservers") + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err) + } + + // webserver should stop to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is unready but PublishNotReadyAddresses is false + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // Wait until the pod becomes unready + err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { + return !podutil.IsPodReady(pod), nil + }) + if err != nil { + framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err) + } + // Wait the change has been propagated and the service start to fail + clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress) + if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) { + _, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + if err != nil { + return true, nil + } + return false, nil + }); pollErr != nil { + framework.ExpectNoError(pollErr, "service still serves traffic") + } + + nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP) + nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + // connect 3 times every 5 seconds to the Service and expect a failure + for i := 0; i < 5; i++ { + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress) + _, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to cluster IP") + + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0) + _, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to NodePort address") + + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1) + _, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to NodePort address") + + time.Sleep(5 * time.Second) + } + }) + /* Release: v1.19 Testname: Service, ClusterIP type, session affinity to ClientIP