mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
implement readiness.
This commit is contained in:
parent
27dfebed98
commit
043794492e
@ -89,6 +89,7 @@ func (fakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.Pod
|
|||||||
r.Status.PodIP = "1.2.3.4"
|
r.Status.PodIP = "1.2.3.4"
|
||||||
m := make(api.PodInfo)
|
m := make(api.PodInfo)
|
||||||
for k, v := range r.Status.Info {
|
for k, v := range r.Status.Info {
|
||||||
|
v.Ready = true
|
||||||
v.PodIP = "1.2.3.4"
|
v.PodIP = "1.2.3.4"
|
||||||
m[k] = v
|
m[k] = v
|
||||||
}
|
}
|
||||||
|
@ -120,6 +120,7 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf
|
|||||||
ID: id,
|
ID: id,
|
||||||
Config: &docker.Config{Image: "testimage"},
|
Config: &docker.Config{Image: "testimage"},
|
||||||
HostConfig: hostConfig,
|
HostConfig: hostConfig,
|
||||||
|
State: docker.State{Running: true},
|
||||||
}
|
}
|
||||||
return f.Err
|
return f.Err
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,6 @@ const defaultChanSize = 1024
|
|||||||
const minShares = 2
|
const minShares = 2
|
||||||
const sharesPerCPU = 1024
|
const sharesPerCPU = 1024
|
||||||
const milliCPUToCPU = 1000
|
const milliCPUToCPU = 1000
|
||||||
const maxRetries int = 3
|
|
||||||
|
|
||||||
// SyncHandler is an interface implemented by Kubelet, for testability
|
// SyncHandler is an interface implemented by Kubelet, for testability
|
||||||
type SyncHandler interface {
|
type SyncHandler interface {
|
||||||
@ -121,6 +120,7 @@ func NewMainKubelet(
|
|||||||
clusterDNS: clusterDNS,
|
clusterDNS: clusterDNS,
|
||||||
serviceLister: serviceLister,
|
serviceLister: serviceLister,
|
||||||
masterServiceNamespace: masterServiceNamespace,
|
masterServiceNamespace: masterServiceNamespace,
|
||||||
|
readiness: newReadinessStates(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := klet.setupDataDirs(); err != nil {
|
if err := klet.setupDataDirs(); err != nil {
|
||||||
@ -197,6 +197,8 @@ type Kubelet struct {
|
|||||||
|
|
||||||
// Volume plugins.
|
// Volume plugins.
|
||||||
volumePluginMgr volume.PluginMgr
|
volumePluginMgr volume.PluginMgr
|
||||||
|
|
||||||
|
readiness *readinessStates
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRootDir returns the full path to the directory under which kubelet can
|
// getRootDir returns the full path to the directory under which kubelet can
|
||||||
@ -876,6 +878,7 @@ func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error {
|
|||||||
|
|
||||||
func (kl *Kubelet) killContainerByID(ID, name string) error {
|
func (kl *Kubelet) killContainerByID(ID, name string) error {
|
||||||
glog.V(2).Infof("Killing container with id %q and name %q", ID, name)
|
glog.V(2).Infof("Killing container with id %q and name %q", ID, name)
|
||||||
|
kl.readiness.remove(ID)
|
||||||
err := kl.dockerClient.StopContainer(ID, 10)
|
err := kl.dockerClient.StopContainer(ID, 10)
|
||||||
if len(name) == 0 {
|
if len(name) == 0 {
|
||||||
return err
|
return err
|
||||||
@ -1048,7 +1051,19 @@ func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.Docke
|
|||||||
// look for changes in the container.
|
// look for changes in the container.
|
||||||
if hash == 0 || hash == expectedHash {
|
if hash == 0 || hash == expectedHash {
|
||||||
// TODO: This should probably be separated out into a separate goroutine.
|
// TODO: This should probably be separated out into a separate goroutine.
|
||||||
healthy, err := kl.probeLiveness(podFullName, uid, podStatus, container, dockerContainer)
|
// If the container's liveness probe is unsuccessful, set readiness to false. If liveness is succesful, do a readiness check and set
|
||||||
|
// readiness accordingly. If the initalDelay since container creation on liveness probe has not passed the probe will return Success.
|
||||||
|
// If the initial delay on the readiness probe has not passed the probe will return Failure.
|
||||||
|
ready := probe.Unknown
|
||||||
|
healthy, err := kl.probeContainer(container.LivenessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Success)
|
||||||
|
if healthy == probe.Success {
|
||||||
|
ready, _ = kl.probeContainer(container.ReadinessProbe, podFullName, uid, podStatus, container, dockerContainer, probe.Failure)
|
||||||
|
}
|
||||||
|
if ready == probe.Success {
|
||||||
|
kl.readiness.set(dockerContainer.ID, true)
|
||||||
|
} else {
|
||||||
|
kl.readiness.set(dockerContainer.ID, false)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(1).Infof("health check errored: %v", err)
|
glog.V(1).Infof("health check errored: %v", err)
|
||||||
containersToKeep[containerID] = empty{}
|
containersToKeep[containerID] = empty{}
|
||||||
@ -1487,6 +1502,31 @@ func getPhase(spec *api.PodSpec, info api.PodInfo) api.PodPhase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an unready condition.
|
||||||
|
func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodCondition {
|
||||||
|
ready := []api.PodCondition{{
|
||||||
|
Kind: api.PodReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
}}
|
||||||
|
unready := []api.PodCondition{{
|
||||||
|
Kind: api.PodReady,
|
||||||
|
Status: api.ConditionNone,
|
||||||
|
}}
|
||||||
|
if info == nil {
|
||||||
|
return unready
|
||||||
|
}
|
||||||
|
for _, container := range spec.Containers {
|
||||||
|
if containerStatus, ok := info[container.Name]; ok {
|
||||||
|
if !containerStatus.Ready {
|
||||||
|
return unready
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return unready
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ready
|
||||||
|
}
|
||||||
|
|
||||||
// GetPodStatus returns information from Docker about the containers in a pod
|
// GetPodStatus returns information from Docker about the containers in a pod
|
||||||
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
|
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
|
||||||
var spec api.PodSpec
|
var spec api.PodSpec
|
||||||
@ -1499,8 +1539,20 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
|
|||||||
|
|
||||||
info, err := dockertools.GetDockerPodInfo(kl.dockerClient, spec, podFullName, uid)
|
info, err := dockertools.GetDockerPodInfo(kl.dockerClient, spec, podFullName, uid)
|
||||||
|
|
||||||
|
for _, c := range spec.Containers {
|
||||||
|
containerStatus := info[c.Name]
|
||||||
|
containerStatus.Ready = kl.readiness.IsReady(containerStatus)
|
||||||
|
info[c.Name] = containerStatus
|
||||||
|
}
|
||||||
|
|
||||||
var podStatus api.PodStatus
|
var podStatus api.PodStatus
|
||||||
podStatus.Phase = getPhase(&spec, info)
|
podStatus.Phase = getPhase(&spec, info)
|
||||||
|
if isPodReady(&spec, info) {
|
||||||
|
podStatus.Conditions = append(podStatus.Conditions, api.PodCondition{
|
||||||
|
Kind: api.PodReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
})
|
||||||
|
}
|
||||||
netContainerInfo, found := info[dockertools.PodInfraContainerName]
|
netContainerInfo, found := info[dockertools.PodInfraContainerName]
|
||||||
if found {
|
if found {
|
||||||
podStatus.PodIP = netContainerInfo.PodIP
|
podStatus.PodIP = netContainerInfo.PodIP
|
||||||
@ -1512,23 +1564,6 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
|
|||||||
return podStatus, err
|
return podStatus, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kl *Kubelet) probeLiveness(podFullName string, podUID types.UID, status api.PodStatus, container api.Container, dockerContainer *docker.APIContainers) (healthStatus probe.Result, err error) {
|
|
||||||
// Give the container 60 seconds to start up.
|
|
||||||
if container.LivenessProbe == nil {
|
|
||||||
return probe.Success, nil
|
|
||||||
}
|
|
||||||
if time.Now().Unix()-dockerContainer.Created < container.LivenessProbe.InitialDelaySeconds {
|
|
||||||
return probe.Success, nil
|
|
||||||
}
|
|
||||||
for i := 0; i < maxRetries; i++ {
|
|
||||||
healthStatus, err = kl.probeContainer(container.LivenessProbe, podFullName, podUID, status, container)
|
|
||||||
if healthStatus == probe.Success {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return healthStatus, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns logs of current machine.
|
// Returns logs of current machine.
|
||||||
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
|
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
|
||||||
// TODO: whitelist logs we are willing to serve
|
// TODO: whitelist logs we are willing to serve
|
||||||
|
@ -66,6 +66,7 @@ func newTestKubelet(t *testing.T) (*Kubelet, *dockertools.FakeDockerClient) {
|
|||||||
kubelet.sourceReady = func(source string) bool { return true }
|
kubelet.sourceReady = func(source string) bool { return true }
|
||||||
kubelet.masterServiceNamespace = api.NamespaceDefault
|
kubelet.masterServiceNamespace = api.NamespaceDefault
|
||||||
kubelet.serviceLister = testServiceLister{}
|
kubelet.serviceLister = testServiceLister{}
|
||||||
|
kubelet.readiness = newReadinessStates()
|
||||||
if err := kubelet.setupDataDirs(); err != nil {
|
if err := kubelet.setupDataDirs(); err != nil {
|
||||||
t.Fatalf("can't initialize kubelet data dirs: %v", err)
|
t.Fatalf("can't initialize kubelet data dirs: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,8 @@ package kubelet
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@ -30,6 +32,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
|
||||||
|
|
||||||
|
"github.com/fsouza/go-dockerclient"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -39,13 +42,47 @@ var (
|
|||||||
tcprober = tcprobe.New()
|
tcprober = tcprobe.New()
|
||||||
)
|
)
|
||||||
|
|
||||||
func (kl *Kubelet) probeContainer(p *api.Probe, podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (probe.Result, error) {
|
const (
|
||||||
|
defaultProbeTimeout = 1 * time.Second
|
||||||
|
maxProbeRetries = 3
|
||||||
|
)
|
||||||
|
|
||||||
|
// probeContainer executes the given probe on a container and returns the result.
|
||||||
|
// If the probe is nil this returns Success. If the probe's initial delay has not passed
|
||||||
|
// since the creation of the container, this returns the defaultResult. It will then attempt
|
||||||
|
// to execute the probe repeatedly up to maxProbeRetries times, and return on the first
|
||||||
|
// successful result, else returning the last unsucessful result and error.
|
||||||
|
func (kl *Kubelet) probeContainer(p *api.Probe,
|
||||||
|
podFullName string,
|
||||||
|
podUID types.UID,
|
||||||
|
status api.PodStatus,
|
||||||
|
container api.Container,
|
||||||
|
dockerContainer *docker.APIContainers,
|
||||||
|
defaultResult probe.Result) (probe.Result, error) {
|
||||||
|
var err error
|
||||||
|
result := probe.Unknown
|
||||||
|
if p == nil {
|
||||||
|
return probe.Success, nil
|
||||||
|
}
|
||||||
|
if time.Now().Unix()-dockerContainer.Created < p.InitialDelaySeconds {
|
||||||
|
return defaultResult, nil
|
||||||
|
}
|
||||||
|
for i := 0; i < maxProbeRetries; i++ {
|
||||||
|
result, err = kl.runProbe(p, podFullName, podUID, status, container)
|
||||||
|
if result == probe.Success {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) runProbe(p *api.Probe, podFullName string, podUID types.UID, status api.PodStatus, container api.Container) (probe.Result, error) {
|
||||||
var timeout time.Duration
|
var timeout time.Duration
|
||||||
secs := container.LivenessProbe.TimeoutSeconds
|
secs := p.TimeoutSeconds
|
||||||
if secs > 0 {
|
if secs > 0 {
|
||||||
timeout = time.Duration(secs) * time.Second
|
timeout = time.Duration(secs) * time.Second
|
||||||
} else {
|
} else {
|
||||||
timeout = 1 * time.Second
|
timeout = defaultProbeTimeout
|
||||||
}
|
}
|
||||||
if p.Exec != nil {
|
if p.Exec != nil {
|
||||||
return execprober.Probe(kl.newExecInContainer(podFullName, podUID, container))
|
return execprober.Probe(kl.newExecInContainer(podFullName, podUID, container))
|
||||||
@ -132,3 +169,41 @@ func (eic execInContainer) CombinedOutput() ([]byte, error) {
|
|||||||
func (eic execInContainer) SetDir(dir string) {
|
func (eic execInContainer) SetDir(dir string) {
|
||||||
//unimplemented
|
//unimplemented
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This will eventually maintain info about probe results over time
|
||||||
|
// to allow for implementation of health thresholds
|
||||||
|
func newReadinessStates() *readinessStates {
|
||||||
|
return &readinessStates{states: make(map[string]bool)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type readinessStates struct {
|
||||||
|
sync.Mutex
|
||||||
|
states map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readinessStates) IsReady(c api.ContainerStatus) bool {
|
||||||
|
if c.State.Running == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return r.get(strings.TrimPrefix(c.ContainerID, "docker://"))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readinessStates) get(key string) bool {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
state, found := r.states[key]
|
||||||
|
return state && found
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readinessStates) set(key string, value bool) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
r.states[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *readinessStates) remove(key string) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
delete(r.states, key)
|
||||||
|
}
|
||||||
|
@ -162,6 +162,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||||||
if pod.Status.Host == "" {
|
if pod.Status.Host == "" {
|
||||||
// Not assigned.
|
// Not assigned.
|
||||||
newStatus.Phase = api.PodPending
|
newStatus.Phase = api.PodPending
|
||||||
|
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
|
||||||
return newStatus, nil
|
return newStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -171,6 +172,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||||||
if err != nil || len(nodeStatus.Conditions) == 0 {
|
if err != nil || len(nodeStatus.Conditions) == 0 {
|
||||||
glog.V(5).Infof("node doesn't exist: %v %v, setting pod status to unknown", err, nodeStatus)
|
glog.V(5).Infof("node doesn't exist: %v %v, setting pod status to unknown", err, nodeStatus)
|
||||||
newStatus.Phase = api.PodUnknown
|
newStatus.Phase = api.PodUnknown
|
||||||
|
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
|
||||||
return newStatus, nil
|
return newStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,6 +181,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||||||
if (condition.Kind == api.NodeReady || condition.Kind == api.NodeReachable) && condition.Status == api.ConditionNone {
|
if (condition.Kind == api.NodeReady || condition.Kind == api.NodeReachable) && condition.Status == api.ConditionNone {
|
||||||
glog.V(5).Infof("node status: %v, setting pod status to unknown", condition)
|
glog.V(5).Infof("node status: %v, setting pod status to unknown", condition)
|
||||||
newStatus.Phase = api.PodUnknown
|
newStatus.Phase = api.PodUnknown
|
||||||
|
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
|
||||||
return newStatus, nil
|
return newStatus, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -189,6 +192,7 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("error getting pod status: %v, setting status to unknown", err)
|
glog.Errorf("error getting pod status: %v, setting status to unknown", err)
|
||||||
newStatus.Phase = api.PodUnknown
|
newStatus.Phase = api.PodUnknown
|
||||||
|
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
|
||||||
} else {
|
} else {
|
||||||
newStatus.Info = result.Status.Info
|
newStatus.Info = result.Status.Info
|
||||||
newStatus.PodIP = result.Status.PodIP
|
newStatus.PodIP = result.Status.PodIP
|
||||||
@ -197,8 +201,10 @@ func (p *PodCache) computePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||||||
// propulated the status yet. This should go away once
|
// propulated the status yet. This should go away once
|
||||||
// we removed boundPods
|
// we removed boundPods
|
||||||
newStatus.Phase = api.PodPending
|
newStatus.Phase = api.PodPending
|
||||||
|
newStatus.Conditions = append(newStatus.Conditions, pod.Status.Conditions...)
|
||||||
} else {
|
} else {
|
||||||
newStatus.Phase = result.Status.Phase
|
newStatus.Phase = result.Status.Phase
|
||||||
|
newStatus.Conditions = result.Status.Conditions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return newStatus, err
|
return newStatus, err
|
||||||
|
@ -76,6 +76,19 @@ func (e *EndpointController) SyncServiceEndpoints() error {
|
|||||||
glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
|
glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inService := false
|
||||||
|
for _, c := range pod.Status.Conditions {
|
||||||
|
if c.Kind == api.PodReady && c.Status == api.ConditionFull {
|
||||||
|
inService = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !inService {
|
||||||
|
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
endpoints = append(endpoints, net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(port)))
|
endpoints = append(endpoints, net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(port)))
|
||||||
}
|
}
|
||||||
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
|
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
|
||||||
|
@ -49,6 +49,12 @@ func newPodList(count int) *api.PodList {
|
|||||||
},
|
},
|
||||||
Status: api.PodStatus{
|
Status: api.PodStatus{
|
||||||
PodIP: "1.2.3.4",
|
PodIP: "1.2.3.4",
|
||||||
|
Conditions: []api.PodCondition{
|
||||||
|
{
|
||||||
|
Kind: api.PodReady,
|
||||||
|
Status: api.ConditionFull,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user