mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			232 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			232 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2015 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package prober
 | 
						|
 | 
						|
import (
 | 
						|
	"math/rand"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/golang/glog"
 | 
						|
	"k8s.io/api/core/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
 | 
						|
	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/prober/results"
 | 
						|
	"k8s.io/kubernetes/pkg/kubelet/util/format"
 | 
						|
)
 | 
						|
 | 
						|
// worker handles the periodic probing of its assigned container. Each worker has a go-routine
 | 
						|
// associated with it which runs the probe loop until the container permanently terminates, or the
 | 
						|
// stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
 | 
						|
// container IDs.
 | 
						|
type worker struct {
 | 
						|
	// Channel for stopping the probe.
 | 
						|
	stopCh chan struct{}
 | 
						|
 | 
						|
	// The pod containing this probe (read-only)
 | 
						|
	pod *v1.Pod
 | 
						|
 | 
						|
	// The container to probe (read-only)
 | 
						|
	container v1.Container
 | 
						|
 | 
						|
	// Describes the probe configuration (read-only)
 | 
						|
	spec *v1.Probe
 | 
						|
 | 
						|
	// The type of the worker.
 | 
						|
	probeType probeType
 | 
						|
 | 
						|
	// The probe value during the initial delay.
 | 
						|
	initialValue results.Result
 | 
						|
 | 
						|
	// Where to store this workers results.
 | 
						|
	resultsManager results.Manager
 | 
						|
	probeManager   *manager
 | 
						|
 | 
						|
	// The last known container ID for this worker.
 | 
						|
	containerID kubecontainer.ContainerID
 | 
						|
	// The last probe result for this worker.
 | 
						|
	lastResult results.Result
 | 
						|
	// How many times in a row the probe has returned the same result.
 | 
						|
	resultRun int
 | 
						|
 | 
						|
	// If set, skip probing.
 | 
						|
	onHold bool
 | 
						|
}
 | 
						|
 | 
						|
// Creates and starts a new probe worker.
 | 
						|
func newWorker(
 | 
						|
	m *manager,
 | 
						|
	probeType probeType,
 | 
						|
	pod *v1.Pod,
 | 
						|
	container v1.Container) *worker {
 | 
						|
 | 
						|
	w := &worker{
 | 
						|
		stopCh:       make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
 | 
						|
		pod:          pod,
 | 
						|
		container:    container,
 | 
						|
		probeType:    probeType,
 | 
						|
		probeManager: m,
 | 
						|
	}
 | 
						|
 | 
						|
	switch probeType {
 | 
						|
	case readiness:
 | 
						|
		w.spec = container.ReadinessProbe
 | 
						|
		w.resultsManager = m.readinessManager
 | 
						|
		w.initialValue = results.Failure
 | 
						|
	case liveness:
 | 
						|
		w.spec = container.LivenessProbe
 | 
						|
		w.resultsManager = m.livenessManager
 | 
						|
		w.initialValue = results.Success
 | 
						|
	}
 | 
						|
 | 
						|
	return w
 | 
						|
}
 | 
						|
 | 
						|
// run periodically probes the container.
 | 
						|
func (w *worker) run() {
 | 
						|
	probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
 | 
						|
	probeTicker := time.NewTicker(probeTickerPeriod)
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		// Clean up.
 | 
						|
		probeTicker.Stop()
 | 
						|
		if !w.containerID.IsEmpty() {
 | 
						|
			w.resultsManager.Remove(w.containerID)
 | 
						|
		}
 | 
						|
 | 
						|
		w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
 | 
						|
	}()
 | 
						|
 | 
						|
	// If kubelet restarted the probes could be started in rapid succession.
 | 
						|
	// Let the worker wait for a random portion of tickerPeriod before probing.
 | 
						|
	time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
 | 
						|
 | 
						|
probeLoop:
 | 
						|
	for w.doProbe() {
 | 
						|
		// Wait for next probe tick.
 | 
						|
		select {
 | 
						|
		case <-w.stopCh:
 | 
						|
			break probeLoop
 | 
						|
		case <-probeTicker.C:
 | 
						|
			// continue
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// stop stops the probe worker. The worker handles cleanup and removes itself from its manager.
 | 
						|
// It is safe to call stop multiple times.
 | 
						|
func (w *worker) stop() {
 | 
						|
	select {
 | 
						|
	case w.stopCh <- struct{}{}:
 | 
						|
	default: // Non-blocking.
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// doProbe probes the container once and records the result.
 | 
						|
// Returns whether the worker should continue.
 | 
						|
func (w *worker) doProbe() (keepGoing bool) {
 | 
						|
	defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
 | 
						|
	defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
 | 
						|
 | 
						|
	status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
 | 
						|
	if !ok {
 | 
						|
		// Either the pod has not been created yet, or it was already deleted.
 | 
						|
		glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	// Worker should terminate if pod is terminated.
 | 
						|
	if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
 | 
						|
		glog.V(3).Infof("Pod %v %v, exiting probe worker",
 | 
						|
			format.Pod(w.pod), status.Phase)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
 | 
						|
	if !ok || len(c.ContainerID) == 0 {
 | 
						|
		// Either the container has not been created yet, or it was deleted.
 | 
						|
		glog.V(3).Infof("Probe target container not found: %v - %v",
 | 
						|
			format.Pod(w.pod), w.container.Name)
 | 
						|
		return true // Wait for more information.
 | 
						|
	}
 | 
						|
 | 
						|
	if w.containerID.String() != c.ContainerID {
 | 
						|
		if !w.containerID.IsEmpty() {
 | 
						|
			w.resultsManager.Remove(w.containerID)
 | 
						|
		}
 | 
						|
		w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
 | 
						|
		w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
 | 
						|
		// We've got a new container; resume probing.
 | 
						|
		w.onHold = false
 | 
						|
	}
 | 
						|
 | 
						|
	if w.onHold {
 | 
						|
		// Worker is on hold until there is a new container.
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	if c.State.Running == nil {
 | 
						|
		glog.V(3).Infof("Non-running container probed: %v - %v",
 | 
						|
			format.Pod(w.pod), w.container.Name)
 | 
						|
		if !w.containerID.IsEmpty() {
 | 
						|
			w.resultsManager.Set(w.containerID, results.Failure, w.pod)
 | 
						|
		}
 | 
						|
		// Abort if the container will not be restarted.
 | 
						|
		return c.State.Terminated == nil ||
 | 
						|
			w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
 | 
						|
	}
 | 
						|
 | 
						|
	if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
 | 
						|
	// the full container environment here, OR we must make a call to the CRI in order to get those environment
 | 
						|
	// values from the running container.
 | 
						|
	result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
 | 
						|
	if err != nil {
 | 
						|
		// Prober error, throw away the result.
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	if w.lastResult == result {
 | 
						|
		w.resultRun++
 | 
						|
	} else {
 | 
						|
		w.lastResult = result
 | 
						|
		w.resultRun = 1
 | 
						|
	}
 | 
						|
 | 
						|
	if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
 | 
						|
		(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
 | 
						|
		// Success or failure is below threshold - leave the probe state unchanged.
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	w.resultsManager.Set(w.containerID, result, w.pod)
 | 
						|
 | 
						|
	if w.probeType == liveness && result == results.Failure {
 | 
						|
		// The container fails a liveness check, it will need to be restarted.
 | 
						|
		// Stop probing until we see a new container ID. This is to reduce the
 | 
						|
		// chance of hitting #21751, where running `docker exec` when a
 | 
						|
		// container is being stopped may lead to corrupted container state.
 | 
						|
		w.onHold = true
 | 
						|
		w.resultRun = 1
 | 
						|
	}
 | 
						|
 | 
						|
	return true
 | 
						|
}
 |