From 74ccd2457432f2df071985507bfe236a866c24cb Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Fri, 6 May 2016 13:15:49 -0700 Subject: [PATCH] Endpoints controller respects unready service annotation --- .../endpoint/endpoints_controller.go | 21 ++++++- test/e2e/service.go | 59 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 5bb9119f983..257ecb3df1d 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -20,6 +20,7 @@ package endpoint import ( "reflect" + "strconv" "time" "encoding/json" @@ -53,6 +54,14 @@ const ( // We must avoid syncing service until the pod store has synced. If it hasn't synced, to // avoid a hot loop, we'll wait this long between checks. PodStoreSyncedPollPeriod = 100 * time.Millisecond + + // An annotation on the Service denoting if the endpoints controller should + // go ahead and create endpoints for unready pods. This annotation is + // currently only used by PetSets, where we need the pet to be DNS + // resolvable during initialization. In this situation we create a headless + // service just for the PetSet, and clients shouldn't be using this Service + // for anything so unready endpoints don't matter. + TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints" ) var ( @@ -356,6 +365,16 @@ func (e *EndpointController) syncService(key string) { subsets := []api.EndpointSubset{} podHostNames := map[string]endpoints.HostRecord{} + var tolerateUnreadyEndpoints bool + if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok { + b, err := strconv.ParseBool(v) + if err == nil { + tolerateUnreadyEndpoints = b + } else { + glog.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err) + } + } + for i := range pods.Items { pod := &pods.Items[i] @@ -401,7 +420,7 @@ func (e *EndpointController) syncService(key string) { epa.Hostname = hostname } - if api.IsPodReady(pod) { + if tolerateUnreadyEndpoints || api.IsPodReady(pod) { subsets = append(subsets, api.EndpointSubset{ Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}, diff --git a/test/e2e/service.go b/test/e2e/service.go index 7314d818d19..6a4240d7f9c 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -900,6 +901,64 @@ var _ = framework.KubeDescribe("Services", func() { service, err = t.CreateService(service) Expect(err).NotTo(HaveOccurred()) }) + + It("should create endpoints for unready pods", func() { + serviceName := "never-ready" + ns := f.Namespace.Name + + t := NewServerTest(c, ns, serviceName) + defer func() { + defer GinkgoRecover() + errs := t.Cleanup() + if len(errs) != 0 { + framework.Failf("errors in cleanup: %v", errs) + } + }() + + service := t.BuildServiceSpec() + service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"} + rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, api.Container{ + Name: t.name, + Image: t.image, + Ports: []api.ContainerPort{{ContainerPort: int32(80), Protocol: api.ProtocolTCP}}, + ReadinessProbe: &api.Probe{ + Handler: api.Handler{ + Exec: &api.ExecAction{ + Command: []string{"/bin/false"}, + }, + }, + }, + }) + + By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector)) + _, err := t.createRC(rcSpec) + ExpectNoError(err) + + By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector)) + _, err = t.CreateService(service) + ExpectNoError(err) + + By("Verifying pods for RC " + t.name) + ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1)) + + svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name) + By("waiting for endpoints of Service with DNS name " + svcName) + + createExecPodOrFail(f.Client, f.Namespace.Name, "exec") + cmd := fmt.Sprintf("wget -qO- %v", svcName) + var stdout string + if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) { + var err error + stdout, err = framework.RunHostCmd(f.Namespace.Name, "exec", cmd) + if err != nil { + framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err) + return false, nil + } + return true, nil + }); pollErr != nil { + framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout) + } + }) }) // updateService fetches a service, calls the update function on it,