diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index 91ee6076efb..173a23e77f2 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -36,7 +36,7 @@ import ( "k8s.io/kubernetes/pkg/probe" execprobe "k8s.io/kubernetes/pkg/probe/exec" httpprobe "k8s.io/kubernetes/pkg/probe/http" - tcprobe "k8s.io/kubernetes/pkg/probe/tcp" + tcpprobe "k8s.io/kubernetes/pkg/probe/tcp" "k8s.io/utils/exec" "k8s.io/klog" @@ -44,7 +44,7 @@ import ( const maxProbeRetries = 3 -// Prober helps to check the liveness/readiness of a container. +// Prober helps to check the liveness/readiness/startup of a container. type prober struct { exec execprobe.Prober // probe types needs different httprobe instances so they don't @@ -52,7 +52,8 @@ type prober struct { // same host:port and transient failures. See #49740. readinessHTTP httpprobe.Prober livenessHTTP httpprobe.Prober - tcp tcprobe.Prober + startupHTTP httpprobe.Prober + tcp tcpprobe.Prober runner kubecontainer.ContainerCommandRunner refManager *kubecontainer.RefManager @@ -71,7 +72,8 @@ func newProber( exec: execprobe.New(), readinessHTTP: httpprobe.New(followNonLocalRedirects), livenessHTTP: httpprobe.New(followNonLocalRedirects), - tcp: tcprobe.New(), + startupHTTP: httpprobe.New(followNonLocalRedirects), + tcp: tcpprobe.New(), runner: runner, refManager: refManager, recorder: recorder, @@ -86,6 +88,8 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c probeSpec = container.ReadinessProbe case liveness: probeSpec = container.LivenessProbe + case startup: + probeSpec = container.StartupProbe default: return results.Failure, fmt.Errorf("unknown probe type: %q", probeType) } @@ -174,11 +178,14 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status url := formatURL(scheme, host, port, path) headers := buildHeader(p.HTTPGet.HTTPHeaders) klog.V(4).Infof("HTTP-Probe Headers: %v", headers) - if probeType == liveness { + switch probeType { + case liveness: return pb.livenessHTTP.Probe(url, headers, timeout) + case startup: + return pb.startupHTTP.Probe(url, headers, timeout) + default: + return pb.readinessHTTP.Probe(url, headers, timeout) } - // readiness - return pb.readinessHTTP.Probe(url, headers, timeout) } if p.TCPSocket != nil { port, err := extractPort(p.TCPSocket.Port, container) diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 69a7ca36a96..d3eccadfe9e 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -23,9 +23,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics" "k8s.io/klog" + "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" @@ -37,7 +39,7 @@ var ProberResults = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: "prober", Name: "probe_total", - Help: "Cumulative number of a liveness or readiness probe for a container by result.", + Help: "Cumulative number of a liveness, readiness or startup probe for a container by result.", StabilityLevel: metrics.ALPHA, }, []string{"probe_type", @@ -89,6 +91,9 @@ type manager struct { // livenessManager manages the results of liveness probes livenessManager results.Manager + // startupManager manages the results of startup probes + startupManager results.Manager + // prober executes the probe actions. prober *prober } @@ -103,11 +108,13 @@ func NewManager( prober := newProber(runner, refManager, recorder) readinessManager := results.NewManager() + startupManager := results.NewManager() return &manager{ statusManager: statusManager, prober: prober, readinessManager: readinessManager, livenessManager: livenessManager, + startupManager: startupManager, workers: make(map[probeKey]*worker), } } @@ -116,6 +123,8 @@ func NewManager( func (m *manager) Start() { // Start syncing readiness. go wait.Forever(m.updateReadiness, 0) + // Start syncing startup. + go wait.Forever(m.updateStartup, 0) } // Key uniquely identifying container probes @@ -125,12 +134,13 @@ type probeKey struct { probeType probeType } -// Type of probe (readiness or liveness) +// Type of probe (liveness, readiness or startup) type probeType int const ( liveness probeType = iota readiness + startup probeResultSuccessful string = "successful" probeResultFailed string = "failed" @@ -144,6 +154,8 @@ func (t probeType) String() string { return "Readiness" case liveness: return "Liveness" + case startup: + return "Startup" default: return "UNKNOWN" } @@ -157,6 +169,18 @@ func (m *manager) AddPod(pod *v1.Pod) { for _, c := range pod.Spec.Containers { key.containerName = c.Name + if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) { + key.probeType = startup + if _, ok := m.workers[key]; ok { + klog.Errorf("Startup probe already exists! %v - %v", + format.Pod(pod), c.Name) + return + } + w := newWorker(m, startup, pod, c) + m.workers[key] = w + go w.run() + } + if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { @@ -190,7 +214,7 @@ func (m *manager) RemovePod(pod *v1.Pod) { key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name - for _, probeType := range [...]probeType{readiness, liveness} { + for _, probeType := range [...]probeType{readiness, liveness, startup} { key.probeType = probeType if worker, ok := m.workers[key]; ok { worker.stop() @@ -223,6 +247,21 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) { ready = !exists } podStatus.ContainerStatuses[i].Ready = ready + + var started bool + if c.State.Running == nil { + started = false + } else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) { + // the container is running, assume it is started if the StartupProbe feature is disabled + started = true + } else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { + started = result == results.Success + } else { + // The check whether there is a probe which hasn't run yet. + _, exists := m.getWorker(podUID, c.Name, startup) + started = !exists + } + podStatus.ContainerStatuses[i].Started = &started } // init containers are ready if they have exited with success or if a readiness probe has // succeeded. @@ -262,3 +301,10 @@ func (m *manager) updateReadiness() { ready := update.Result == results.Success m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) } + +func (m *manager) updateStartup() { + update := <-m.startupManager.Updates() + + started := update.Result == results.Success + m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) +} diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index c502e0245c2..e43041855e3 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -98,6 +98,10 @@ func newWorker( w.spec = container.LivenessProbe w.resultsManager = m.livenessManager w.initialValue = results.Success + case startup: + w.spec = container.StartupProbe + w.resultsManager = m.startupManager + w.initialValue = results.Failure } basicMetricLabels := prometheus.Labels{ @@ -218,10 +222,23 @@ func (w *worker) doProbe() (keepGoing bool) { w.pod.Spec.RestartPolicy != v1.RestartPolicyNever } + // Probe disabled for InitialDelaySeconds. if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds { return true } + if c.Started != nil && *c.Started { + // Stop probing for startup once container has started. + if w.probeType == startup { + return true + } + } else { + // Disable other probes until container has started. + if w.probeType != startup { + return true + } + } + // TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct // the full container environment here, OR we must make a call to the CRI in order to get those environment // values from the running container. @@ -255,8 +272,8 @@ func (w *worker) doProbe() (keepGoing bool) { w.resultsManager.Set(w.containerID, result, w.pod) - if w.probeType == liveness && result == results.Failure { - // The container fails a liveness check, it will need to be restarted. + if (w.probeType == liveness || w.probeType == startup) && result == results.Failure { + // The container fails a liveness/startup check, it will need to be restarted. // Stop probing until we see a new container ID. This is to reduce the // chance of hitting #21751, where running `docker exec` when a // container is being stopped may lead to corrupted container state. diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 380c7827b20..5e668a57eb5 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -100,6 +100,10 @@ type Manager interface { // triggers a status update. SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) + // SetContainerStartup updates the cached container status with the given startup, and + // triggers a status update. + SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) + // TerminatePod resets the container status for the provided pod to terminated and triggers // a status update. TerminatePod(pod *v1.Pod) @@ -248,6 +252,45 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai m.updateStatusInternal(pod, status, false) } +func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + + pod, ok := m.podManager.GetPodByUID(podUID) + if !ok { + klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID)) + return + } + + oldStatus, found := m.podStatuses[pod.UID] + if !found { + klog.Warningf("Container startup changed before pod has synced: %q - %q", + format.Pod(pod), containerID.String()) + return + } + + // Find the container to update. + containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String()) + if !ok { + klog.Warningf("Container startup changed for unknown container: %q - %q", + format.Pod(pod), containerID.String()) + return + } + + if containerStatus.Started != nil && *containerStatus.Started == started { + klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started, + format.Pod(pod), containerID.String()) + return + } + + // Make sure we're not updating the cached version. + status := *oldStatus.status.DeepCopy() + containerStatus, _, _ = findContainerStatus(&status, containerID.String()) + containerStatus.Started = &started + + m.updateStatusInternal(pod, status, false) +} + func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) { // Find the container to update. for i, c := range status.ContainerStatuses {