Merge pull request #37093 from simonswine/fix-tolerate-unready-endpoints-pods-terminating

Automatic merge from submit-queue (batch tested with PRs 39092, 39126, 37380, 37093, 39237)

Endpoints with TolerateUnready annotation, should list Pods in state terminating

**What this PR does / why we need it**:

We are using preStop lifecycle hooks to gracefully remove a node from a cluster. This hook is potentially long running and after the preStop hook is fired, the DNS resolution of the soon to be stopped Pod is failing, which causes a failure there.

**Special notes for your reviewer**:

Would be great to backport that to 1.4, 1.3 

**Release note**:

```release-note
Endpoints, that tolerate unready Pods, are now listing Pods in state Terminating as well
```

@bprashanth
This commit is contained in:
Kubernetes Submit Queue 2017-01-03 09:45:24 -08:00 committed by GitHub
commit d6dbd50909
2 changed files with 102 additions and 11 deletions

View File

@ -59,9 +59,13 @@ const (
// An annotation on the Service denoting if the endpoints controller should
// go ahead and create endpoints for unready pods. This annotation is
// currently only used by StatefulSets, where we need the pod to be DNS
// resolvable during initialization. In this situation we create a headless
// service just for the StatefulSet, and clients shouldn't be using this Service
// for anything so unready endpoints don't matter.
// resolvable during initialization and termination. In this situation we
// create a headless Service just for the StatefulSet, and clients shouldn't
// be using this Service for anything so unready endpoints don't matter.
// Endpoints of these Services retain their DNS records and continue
// receiving traffic for the Service from the moment the kubelet starts all
// containers in the pod and marks it "Running", till the kubelet stops all
// containers and deletes the pod from the apiserver.
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
)
@ -403,7 +407,7 @@ func (e *EndpointController) syncService(key string) error {
glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if pod.DeletionTimestamp != nil {
if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
continue
}

View File

@ -1048,7 +1048,7 @@ var _ = framework.KubeDescribe("Services", func() {
})
It("should create endpoints for unready pods", func() {
serviceName := "never-ready"
serviceName := "tolerate-unready"
ns := f.Namespace.Name
t := NewServerTest(cs, ns, serviceName)
@ -1060,12 +1060,31 @@ var _ = framework.KubeDescribe("Services", func() {
}
}()
service := t.BuildServiceSpec()
service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"}
t.name = "slow-terminating-unready-pod"
t.image = "gcr.io/google_containers/netexec:1.7"
port := 80
terminateSeconds := int64(600)
service := &v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: t.ServiceName,
Namespace: t.Namespace,
Annotations: map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"},
},
Spec: v1.ServiceSpec{
Selector: t.Labels,
Ports: []v1.ServicePort{{
Name: "http",
Port: int32(port),
TargetPort: intstr.FromInt(port),
}},
},
}
rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, v1.Container{
Args: []string{fmt.Sprintf("--http-port=%d", port)},
Name: t.name,
Image: t.image,
Ports: []v1.ContainerPort{{ContainerPort: int32(80), Protocol: v1.ProtocolTCP}},
Ports: []v1.ContainerPort{{ContainerPort: int32(port), Protocol: v1.ProtocolTCP}},
ReadinessProbe: &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{
@ -1073,9 +1092,17 @@ var _ = framework.KubeDescribe("Services", func() {
},
},
},
Lifecycle: &v1.Lifecycle{
PreStop: &v1.Handler{
Exec: &v1.ExecAction{
Command: []string{"/bin/sleep", fmt.Sprintf("%d", terminateSeconds)},
},
},
},
}, nil)
rcSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &terminateSeconds
By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
By(fmt.Sprintf("creating RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
_, err := t.createRC(rcSpec)
framework.ExpectNoError(err)
@ -1087,10 +1114,10 @@ var _ = framework.KubeDescribe("Services", func() {
framework.ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1))
svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name)
By("waiting for endpoints of Service with DNS name " + svcName)
By("Waiting for endpoints of Service with DNS name " + svcName)
execPodName := createExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-")
cmd := fmt.Sprintf("wget -qO- %v", svcName)
cmd := fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
var stdout string
if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
var err error
@ -1103,6 +1130,66 @@ var _ = framework.KubeDescribe("Services", func() {
}); pollErr != nil {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
}
By("Scaling down replication controler to zero")
framework.ScaleRC(f.ClientSet, f.InternalClientset, t.Namespace, rcSpec.Name, 0, false)
By("Update service to not tolerate unready services")
_, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "false"
})
framework.ExpectNoError(err)
By("Check if pod is unreachable")
cmd = fmt.Sprintf("wget -qO- -T 2 http://%s:%d/; test \"$?\" -eq \"1\"", svcName, port)
if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
var err error
stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
if err != nil {
framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
return false, nil
}
return true, nil
}); pollErr != nil {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
}
By("Update service to tolerate unready services again")
_, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "true"
})
framework.ExpectNoError(err)
By("Check if terminating pod is available through service")
cmd = fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
var err error
stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
if err != nil {
framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
return false, nil
}
return true, nil
}); pollErr != nil {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
}
By("Remove pods immediately")
label := labels.SelectorFromSet(labels.Set(t.Labels))
options := v1.ListOptions{LabelSelector: label.String()}
podClient := t.Client.Core().Pods(f.Namespace.Name)
pods, err := podClient.List(options)
if err != nil {
framework.Logf("warning: error retrieving pods: %s", err)
} else {
for _, pod := range pods.Items {
var gracePeriodSeconds int64 = 0
err := podClient.Delete(pod.Name, &v1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
if err != nil {
framework.Logf("warning: error force deleting pod '%s': %s", pod.Name, err)
}
}
}
})
It("should only allow access from service loadbalancer source ranges [Slow]", func() {