diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 3612586ac96..799e40427d4 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -89,6 +89,7 @@ func (fakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.Pod r.Status.PodIP = "1.2.3.4" m := make(api.PodInfo) for k, v := range r.Status.Info { + v.Ready = true v.PodIP = "1.2.3.4" m[k] = v } diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 5fef7bbac5a..518f8d3fe2f 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -120,6 +120,7 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf ID: id, Config: &docker.Config{Image: "testimage"}, HostConfig: hostConfig, + State: docker.State{Running: true}, } return f.Err } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index def6ca14da5..7e376f90916 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -55,7 +55,6 @@ const defaultChanSize = 1024 const minShares = 2 const sharesPerCPU = 1024 const milliCPUToCPU = 1000 -const maxRetries int = 3 // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { @@ -121,6 +120,7 @@ func NewMainKubelet( clusterDNS: clusterDNS, serviceLister: serviceLister, masterServiceNamespace: masterServiceNamespace, + readiness: newReadinessStates(), } if err := klet.setupDataDirs(); err != nil { @@ -197,6 +197,8 @@ type Kubelet struct { // Volume plugins. volumePluginMgr volume.PluginMgr + + readiness *readinessStates } // getRootDir returns the full path to the directory under which kubelet can @@ -876,6 +878,7 @@ func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error { func (kl *Kubelet) killContainerByID(ID, name string) error { glog.V(2).Infof("Killing container with id %q and name %q", ID, name) + kl.readiness.remove(ID) err := kl.dockerClient.StopContainer(ID, 10) if len(name) == 0 { return err @@ -1048,7 +1051,19 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke // look for changes in the container. if hash == 0 || hash == expectedHash { // TODO: This should probably be separated out into a separate goroutine. - healthy, err := kl.probeLiveness(podFullName, uid, podStatus, container, dockerContainer) + // If the container's liveness probe is unsuccessful, set readiness to false. If liveness is succesful, do a readiness check and set + // readiness accordingly. If the initalDelay since container creation on liveness probe has not passed the probe will return Success. + // If the initial delay on the readiness probe has not passed the probe will return Failure. + ready := probe.Unknown + healthy, err := kl.probeContainer(container.LivenessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Success) + if healthy == probe.Success { + ready, _ = kl.probeContainer(container.ReadinessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Failure) + } + if ready == probe.Success { + kl.readiness.set(dockerContainer.ID, true) + } else { + kl.readiness.set(dockerContainer.ID, false) + } if err != nil { glog.V(1).Infof("health check errored: %v", err) containersToKeep[containerID] = empty{} @@ -1487,6 +1502,31 @@ func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase { } } +// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an unready condition. +func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodCondition { + ready := []api.PodCondition{{ + Kind: api.PodReady, + Status: api.ConditionFull, + }} + unready := []api.PodCondition{{ + Kind: api.PodReady, + Status: api.ConditionNone, + }} + if info == nil { + return unready + } + for _, container := range spec.Containers { + if containerStatus, ok := info[container.Name]; ok { + if !containerStatus.Ready { + return unready + } + } else { + return unready + } + } + return ready +} + // GetPodStatus returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { var spec api.PodSpec @@ -1499,8 +1539,20 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu info, err := dockertools.GetDockerPodInfo(kl.dockerClient, spec, podFullName, uid) + for _, c := range spec.Containers { + containerStatus := info[c.Name] + containerStatus.Ready = kl.readiness.IsReady(containerStatus) + info[c.Name] = containerStatus + } + var podStatus api.PodStatus podStatus.Phase = getPhase(&spec, info) + if isPodReady(&spec, info) { + podStatus.Conditions = append(podStatus.Conditions, api.PodCondition{ + Kind: api.PodReady, + Status: api.ConditionFull, + }) + } netContainerInfo, found := info[dockertools.PodInfraContainerName] if found { podStatus.PodIP = netContainerInfo.PodIP @@ -1512,23 +1564,6 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu return podStatus, err } -func (kl *Kubelet) probeLiveness(podFullName string, podUID types.UID, status api.PodStatus, container api.Container, dockerContainer *docker.APIContainers) (healthStatus probe.Result, err error) { - // Give the container 60 seconds to start up. - if container.LivenessProbe == nil { - return probe.Success, nil - } - if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds { - return probe.Success, nil - } - for i := 0; i < maxRetries; i++ { - healthStatus, err = kl.probeContainer(container.LivenessProbe, podFullName, podUID, status, container) - if healthStatus == probe.Success { - return - } - } - return healthStatus, err -} - // Returns logs of current machine. func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { // TODO: whitelist logs we are willing to serve diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 4441c9d942e..52cd14865a0 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -66,6 +66,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) { kubelet.sourceReady = func(source string) bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} + kubelet.readiness = newReadinessStates() if err := kubelet.setupDataDirs(); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } diff --git a/pkg/kubelet/probe.go b/pkg/kubelet/probe.go index a3dc573ad85..bab1f8bd941 100644 --- a/pkg/kubelet/probe.go +++ b/pkg/kubelet/probe.go @@ -19,6 +19,8 @@ package kubelet import ( "fmt" "strconv" + "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -30,6 +32,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" + "github.com/fsouza/go-dockerclient" "github.com/golang/glog" ) @@ -39,13 +42,47 @@ var ( tcprober = tcprobe.New() ) -func (kl *Kubelet) probeContainer(p *api.Probe, podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (probe.Result, error) { +const ( + defaultProbeTimeout = 1 * time.Second + maxProbeRetries = 3 +) + +// probeContainer executes the given probe on a container and returns the result. +// If the probe is nil this returns Success. If the probe's initial delay has not passed +// since the creation of the container, this returns the defaultResult. It will then attempt +// to execute the probe repeatedly up to maxProbeRetries times, and return on the first +// successful result, else returning the last unsucessful result and error. +func (kl *Kubelet) probeContainer(p *api.Probe, + podFullName string, + podUID types.UID, + status api.PodStatus, + container api.Container, + dockerContainer *docker.APIContainers, + defaultResult probe.Result) (probe.Result, error) { + var err error + result := probe.Unknown + if p == nil { + return probe.Success, nil + } + if time.Now().Unix()-dockerContainer.Created < p.InitialDelaySeconds { + return defaultResult, nil + } + for i := 0; i < maxProbeRetries; i++ { + result, err = kl.runProbe(p, podFullName, podUID, status, container) + if result == probe.Success { + return result, err + } + } + return result, err +} + +func (kl *Kubelet) runProbe(p *api.Probe, podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (probe.Result, error) { var timeout time.Duration - secs := container.LivenessProbe.TimeoutSeconds + secs := p.TimeoutSeconds if secs > 0 { timeout = time.Duration(secs) * time.Second } else { - timeout = 1 * time.Second + timeout = defaultProbeTimeout } if p.Exec != nil { return execprober.Probe(kl.newExecInContainer(podFullName, podUID, container)) @@ -132,3 +169,41 @@ func (eic execInContainer) CombinedOutput() ([]byte, error) { func (eic execInContainer) SetDir(dir string) { //unimplemented } + +// This will eventually maintain info about probe results over time +// to allow for implementation of health thresholds +func newReadinessStates() *readinessStates { + return &readinessStates{states: make(map[string]bool)} +} + +type readinessStates struct { + sync.Mutex + states map[string]bool +} + +func (r *readinessStates) IsReady(c api.ContainerStatus) bool { + if c.State.Running == nil { + return false + } + return r.get(strings.TrimPrefix(c.ContainerID, "docker://")) + +} + +func (r *readinessStates) get(key string) bool { + r.Lock() + defer r.Unlock() + state, found := r.states[key] + return state && found +} + +func (r *readinessStates) set(key string, value bool) { + r.Lock() + defer r.Unlock() + r.states[key] = value +} + +func (r *readinessStates) remove(key string) { + r.Lock() + defer r.Unlock() + delete(r.states, key) +} diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index cd08556ea42..9cfe7726537 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -162,6 +162,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { if pod.Status.Host == "" { // Not assigned. newStatus.Phase = api.PodPending + newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...) return newStatus, nil } @@ -171,6 +172,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { if err != nil || len(nodeStatus.Conditions) == 0 { glog.V(5).Infof("node doesn't exist: %v %v, setting pod status to unknown", err, nodeStatus) newStatus.Phase = api.PodUnknown + newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...) return newStatus, nil } @@ -179,6 +181,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { if (condition.Kind == api.NodeReady || condition.Kind == api.NodeReachable) && condition.Status == api.ConditionNone { glog.V(5).Infof("node status: %v, setting pod status to unknown", condition) newStatus.Phase = api.PodUnknown + newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...) return newStatus, nil } } @@ -189,6 +192,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { if err != nil { glog.Errorf("error getting pod status: %v, setting status to unknown", err) newStatus.Phase = api.PodUnknown + newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...) } else { newStatus.Info = result.Status.Info newStatus.PodIP = result.Status.PodIP @@ -197,8 +201,10 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) { // propulated the status yet. This should go away once // we removed boundPods newStatus.Phase = api.PodPending + newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...) } else { newStatus.Phase = result.Status.Phase + newStatus.Conditions = result.Status.Conditions } } return newStatus, err diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 3801972bb22..82c1ce2a0d8 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -76,6 +76,19 @@ func (e *EndpointController) SyncServiceEndpoints() error { glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) continue } + + inService := false + for _, c := range pod.Status.Conditions { + if c.Kind == api.PodReady && c.Status == api.ConditionFull { + inService = true + break + } + } + if !inService { + glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) + continue + } + endpoints = append(endpoints, net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(port))) } currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 91ec1af64a1..1755680bf88 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -49,6 +49,12 @@ func newPodList(count int) *api.PodList { }, Status: api.PodStatus{ PodIP: "1.2.3.4", + Conditions: []api.PodCondition{ + { + Kind: api.PodReady, + Status: api.ConditionFull, + }, + }, }, }) }