Endpoints controller respects unready service annotation

This commit is contained in:
Prashanth Balasubramanian 2016-05-06 13:15:49 -07:00
parent 377957a173
commit 74ccd24574
2 changed files with 79 additions and 1 deletions

View File

@ -20,6 +20,7 @@ package endpoint
import (
"reflect"
"strconv"
"time"
"encoding/json"
@ -53,6 +54,14 @@ const (
// We must avoid syncing service until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// An annotation on the Service denoting if the endpoints controller should
// go ahead and create endpoints for unready pods. This annotation is
// currently only used by PetSets, where we need the pet to be DNS
// resolvable during initialization. In this situation we create a headless
// service just for the PetSet, and clients shouldn't be using this Service
// for anything so unready endpoints don't matter.
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
)
var (
@ -356,6 +365,16 @@ func (e *EndpointController) syncService(key string) {
subsets := []api.EndpointSubset{}
podHostNames := map[string]endpoints.HostRecord{}
var tolerateUnreadyEndpoints bool
if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
b, err := strconv.ParseBool(v)
if err == nil {
tolerateUnreadyEndpoints = b
} else {
glog.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err)
}
}
for i := range pods.Items {
pod := &pods.Items[i]
@ -401,7 +420,7 @@ func (e *EndpointController) syncService(key string) {
epa.Hostname = hostname
}
if api.IsPodReady(pod) {
if tolerateUnreadyEndpoints || api.IsPodReady(pod) {
subsets = append(subsets, api.EndpointSubset{
Addresses: []api.EndpointAddress{epa},
Ports: []api.EndpointPort{epp},

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
@ -900,6 +901,64 @@ var _ = framework.KubeDescribe("Services", func() {
service, err = t.CreateService(service)
Expect(err).NotTo(HaveOccurred())
})
It("should create endpoints for unready pods", func() {
serviceName := "never-ready"
ns := f.Namespace.Name
t := NewServerTest(c, ns, serviceName)
defer func() {
defer GinkgoRecover()
errs := t.Cleanup()
if len(errs) != 0 {
framework.Failf("errors in cleanup: %v", errs)
}
}()
service := t.BuildServiceSpec()
service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"}
rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, api.Container{
Name: t.name,
Image: t.image,
Ports: []api.ContainerPort{{ContainerPort: int32(80), Protocol: api.ProtocolTCP}},
ReadinessProbe: &api.Probe{
Handler: api.Handler{
Exec: &api.ExecAction{
Command: []string{"/bin/false"},
},
},
},
})
By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
_, err := t.createRC(rcSpec)
ExpectNoError(err)
By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector))
_, err = t.CreateService(service)
ExpectNoError(err)
By("Verifying pods for RC " + t.name)
ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1))
svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name)
By("waiting for endpoints of Service with DNS name " + svcName)
createExecPodOrFail(f.Client, f.Namespace.Name, "exec")
cmd := fmt.Sprintf("wget -qO- %v", svcName)
var stdout string
if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
var err error
stdout, err = framework.RunHostCmd(f.Namespace.Name, "exec", cmd)
if err != nil {
framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
return false, nil
}
return true, nil
}); pollErr != nil {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
}
})
})
// updateService fetches a service, calls the update function on it,