From b44de1ef27954e0bc52f133c29b568aa382f7fdd Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Tue, 3 Jan 2017 13:00:15 +0000 Subject: [PATCH] Fix: With TolerateUnready set, endpoints are still listed for a Pod in state terminating * Otherwise it prevents long running task in a preStop hook to succeed, that require DNS resolution --- .../endpoint/endpoints_controller.go | 12 ++- test/e2e/service.go | 101 ++++++++++++++++-- 2 files changed, 102 insertions(+), 11 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 1bbaf1ea73c..1dd94f9d9b1 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -59,9 +59,13 @@ const ( // An annotation on the Service denoting if the endpoints controller should // go ahead and create endpoints for unready pods. This annotation is // currently only used by StatefulSets, where we need the pod to be DNS - // resolvable during initialization. In this situation we create a headless - // service just for the StatefulSet, and clients shouldn't be using this Service - // for anything so unready endpoints don't matter. + // resolvable during initialization and termination. In this situation we + // create a headless Service just for the StatefulSet, and clients shouldn't + // be using this Service for anything so unready endpoints don't matter. + // Endpoints of these Services retain their DNS records and continue + // receiving traffic for the Service from the moment the kubelet starts all + // containers in the pod and marks it "Running", till the kubelet stops all + // containers and deletes the pod from the apiserver. TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints" ) @@ -403,7 +407,7 @@ func (e *EndpointController) syncService(key string) error { glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) continue } - if pod.DeletionTimestamp != nil { + if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil { glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name) continue } diff --git a/test/e2e/service.go b/test/e2e/service.go index 7fc108a65a7..a67e322158d 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -1048,7 +1048,7 @@ var _ = framework.KubeDescribe("Services", func() { }) It("should create endpoints for unready pods", func() { - serviceName := "never-ready" + serviceName := "tolerate-unready" ns := f.Namespace.Name t := NewServerTest(cs, ns, serviceName) @@ -1060,12 +1060,31 @@ var _ = framework.KubeDescribe("Services", func() { } }() - service := t.BuildServiceSpec() - service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"} + t.name = "slow-terminating-unready-pod" + t.image = "gcr.io/google_containers/netexec:1.7" + port := 80 + terminateSeconds := int64(600) + + service := &v1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: t.ServiceName, + Namespace: t.Namespace, + Annotations: map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"}, + }, + Spec: v1.ServiceSpec{ + Selector: t.Labels, + Ports: []v1.ServicePort{{ + Name: "http", + Port: int32(port), + TargetPort: intstr.FromInt(port), + }}, + }, + } rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, v1.Container{ + Args: []string{fmt.Sprintf("--http-port=%d", port)}, Name: t.name, Image: t.image, - Ports: []v1.ContainerPort{{ContainerPort: int32(80), Protocol: v1.ProtocolTCP}}, + Ports: []v1.ContainerPort{{ContainerPort: int32(port), Protocol: v1.ProtocolTCP}}, ReadinessProbe: &v1.Probe{ Handler: v1.Handler{ Exec: &v1.ExecAction{ @@ -1073,9 +1092,17 @@ var _ = framework.KubeDescribe("Services", func() { }, }, }, + Lifecycle: &v1.Lifecycle{ + PreStop: &v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/sleep", fmt.Sprintf("%d", terminateSeconds)}, + }, + }, + }, }, nil) + rcSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &terminateSeconds - By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector)) + By(fmt.Sprintf("creating RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector)) _, err := t.createRC(rcSpec) framework.ExpectNoError(err) @@ -1087,10 +1114,10 @@ var _ = framework.KubeDescribe("Services", func() { framework.ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1)) svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name) - By("waiting for endpoints of Service with DNS name " + svcName) + By("Waiting for endpoints of Service with DNS name " + svcName) execPodName := createExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-") - cmd := fmt.Sprintf("wget -qO- %v", svcName) + cmd := fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port) var stdout string if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) { var err error @@ -1103,6 +1130,66 @@ var _ = framework.KubeDescribe("Services", func() { }); pollErr != nil { framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout) } + + By("Scaling down replication controler to zero") + framework.ScaleRC(f.ClientSet, f.InternalClientset, t.Namespace, rcSpec.Name, 0, false) + + By("Update service to not tolerate unready services") + _, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) { + s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "false" + }) + framework.ExpectNoError(err) + + By("Check if pod is unreachable") + cmd = fmt.Sprintf("wget -qO- -T 2 http://%s:%d/; test \"$?\" -eq \"1\"", svcName, port) + if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) { + var err error + stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd) + if err != nil { + framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err) + return false, nil + } + return true, nil + }); pollErr != nil { + framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout) + } + + By("Update service to tolerate unready services again") + _, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) { + s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "true" + }) + framework.ExpectNoError(err) + + By("Check if terminating pod is available through service") + cmd = fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port) + if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) { + var err error + stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd) + if err != nil { + framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err) + return false, nil + } + return true, nil + }); pollErr != nil { + framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout) + } + + By("Remove pods immediately") + label := labels.SelectorFromSet(labels.Set(t.Labels)) + options := v1.ListOptions{LabelSelector: label.String()} + podClient := t.Client.Core().Pods(f.Namespace.Name) + pods, err := podClient.List(options) + if err != nil { + framework.Logf("warning: error retrieving pods: %s", err) + } else { + for _, pod := range pods.Items { + var gracePeriodSeconds int64 = 0 + err := podClient.Delete(pod.Name, &v1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds}) + if err != nil { + framework.Logf("warning: error force deleting pod '%s': %s", pod.Name, err) + } + } + } }) It("should only allow access from service loadbalancer source ranges [Slow]", func() {