mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-20 17:38:50 +00:00
Keep pod worker running until pod is truly complete
A number of race conditions exist when pods are terminated early in their lifecycle because components in the kubelet need to know "no running containers" or "containers can't be started from now on" but were relying on outdated state. Only the pod worker knows whether containers are being started for a given pod, which is required to know when a pod is "terminated" (no running containers, none coming). Move that responsibility and podKiller function into the pod workers, and have everything that was killing the pod go into the UpdatePod loop. Split syncPod into three phases - setup, terminate containers, and cleanup pod - and have transitions between those methods be visible to other components. After this change, to kill a pod you tell the pod worker to UpdatePod({UpdateType: SyncPodKill, Pod: pod}). Several places in the kubelet were incorrect about whether they were handling terminating (should stop running, might have containers) or terminated (no running containers) pods. The pod worker exposes methods that allow other loops to know when to set up or tear down resources based on the state of the pod - these methods remove the possibility of race conditions by ensuring a single component is responsible for knowing each pod's allowed state and other components simply delegate to checking whether they are in the window by UID. Removing containers now no longer blocks final pod deletion in the API server and are handled as background cleanup. Node shutdown no longer marks pods as failed as they can be restarted in the next step. See https://docs.google.com/document/d/1Pic5TPntdJnYfIpBeZndDelM-AbS4FN9H2GTLFhoJ04/edit# for details
This commit is contained in:
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"math"
|
||||
@@ -136,6 +137,11 @@ const (
|
||||
// Period for performing global cleanup tasks.
|
||||
housekeepingPeriod = time.Second * 2
|
||||
|
||||
// Duration at which housekeeping failed to satisfy the invariant that
|
||||
// housekeeping should be fast to avoid blocking pod config (while
|
||||
// housekeeping is running no new pods are started or deleted).
|
||||
housekeepingWarningDuration = time.Second * 15
|
||||
|
||||
// Period for performing eviction monitoring.
|
||||
// ensure this is kept in sync with internal cadvisor housekeeping.
|
||||
evictionMonitoringPeriod = time.Second * 10
|
||||
@@ -626,6 +632,20 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.containerLogManager = logs.NewStubContainerLogManager()
|
||||
}
|
||||
|
||||
klet.reasonCache = NewReasonCache()
|
||||
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
|
||||
klet.podWorkers = newPodWorkers(
|
||||
klet.syncPod,
|
||||
klet.syncTerminatingPod,
|
||||
klet.syncTerminatedPod,
|
||||
|
||||
kubeDeps.Recorder,
|
||||
klet.workQueue,
|
||||
klet.resyncInterval,
|
||||
backOffPeriod,
|
||||
klet.podCache,
|
||||
)
|
||||
|
||||
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
|
||||
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
||||
klet.livenessManager,
|
||||
@@ -633,7 +653,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.startupManager,
|
||||
seccompProfileRoot,
|
||||
machineInfo,
|
||||
klet,
|
||||
klet.podWorkers,
|
||||
kubeDeps.OSInterface,
|
||||
klet,
|
||||
httpClient,
|
||||
@@ -764,7 +784,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
kubeCfg.EnableControllerAttachDetach,
|
||||
nodeName,
|
||||
klet.podManager,
|
||||
klet.statusManager,
|
||||
klet.podWorkers,
|
||||
klet.kubeClient,
|
||||
klet.volumePluginMgr,
|
||||
klet.containerRuntime,
|
||||
@@ -776,12 +796,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
keepTerminatedPodVolumes,
|
||||
volumepathhandler.NewBlockVolumePathHandler())
|
||||
|
||||
klet.reasonCache = NewReasonCache()
|
||||
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
|
||||
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
|
||||
|
||||
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
|
||||
klet.podKiller = NewPodKiller(klet)
|
||||
|
||||
// setup eviction manager
|
||||
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
|
||||
@@ -1087,9 +1102,6 @@ type Kubelet struct {
|
||||
// Container restart Backoff
|
||||
backOff *flowcontrol.Backoff
|
||||
|
||||
// Pod killer handles pods to be killed
|
||||
podKiller PodKiller
|
||||
|
||||
// Information about the ports which are opened by daemons on Node running this Kubelet server.
|
||||
daemonEndpoints *v1.NodeDaemonEndpoints
|
||||
|
||||
@@ -1452,10 +1464,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
kl.initNetworkUtil()
|
||||
}
|
||||
|
||||
// Start a goroutine responsible for killing pods (that are not properly
|
||||
// handled by pod workers).
|
||||
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
|
||||
|
||||
// Start component sync loops.
|
||||
kl.statusManager.Start()
|
||||
|
||||
@@ -1469,7 +1477,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
kl.syncLoop(updates, kl)
|
||||
}
|
||||
|
||||
// syncPod is the transaction script for the sync of a single pod.
|
||||
// syncPod is the transaction script for the sync of a single pod (setting up)
|
||||
// a pod. The reverse (teardown) is handled in syncTerminatingPod and
|
||||
// syncTerminatedPod. If syncPod exits without error, then the pod runtime
|
||||
// state is in sync with the desired configuration state (pod is running).
|
||||
// If syncPod exits with a transient error, the next invocation of syncPod
|
||||
// is expected to make progress towards reaching the runtime state.
|
||||
//
|
||||
// Arguments:
|
||||
//
|
||||
@@ -1481,7 +1494,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
// * If the pod is being seen as running for the first time, record pod
|
||||
// start latency
|
||||
// * Update the status of the pod in the status manager
|
||||
// * Kill the pod if it should not be running
|
||||
// * Kill the pod if it should not be running due to soft admission
|
||||
// * Create a mirror pod if the pod is a static pod, and does not
|
||||
// already have a mirror pod
|
||||
// * Create the data directories for the pod if they do not exist
|
||||
@@ -1496,39 +1509,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
// This operation writes all events that are dispatched in order to provide
|
||||
// the most accurate information possible about an error situation to aid debugging.
|
||||
// Callers should not throw an event if this operation returns an error.
|
||||
func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
// pull out the required options
|
||||
pod := o.pod
|
||||
mirrorPod := o.mirrorPod
|
||||
podStatus := o.podStatus
|
||||
updateType := o.updateType
|
||||
|
||||
// if we want to kill a pod, do it now!
|
||||
if updateType == kubetypes.SyncPodKill {
|
||||
killPodOptions := o.killPodOptions
|
||||
if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
|
||||
return fmt.Errorf("kill pod options are required if update type is kill")
|
||||
}
|
||||
apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
// we kill the pod with the specified grace period since this is a termination
|
||||
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// If a pod is still gracefully terminating, then we do not want to
|
||||
// take further action. This mitigates static pods and deleted pods
|
||||
// from getting rerun prematurely or their cgroups being deleted before
|
||||
// the runtime cleans up.
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
if kl.podKiller.IsPodPendingTerminationByPodName(podFullName) {
|
||||
return fmt.Errorf("pod %q is pending termination", podFullName)
|
||||
}
|
||||
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
||||
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// Latency measurements for the main workflow are relative to the
|
||||
// first time the pod was seen by the API server.
|
||||
@@ -1565,16 +1548,15 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
podStatus.IPs = []string{apiPodStatus.PodIP}
|
||||
}
|
||||
|
||||
// Record the time it takes for the pod to become running.
|
||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
|
||||
!firstSeenTime.IsZero() {
|
||||
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
|
||||
}
|
||||
|
||||
// If the pod should not be running, we request the pod's containers be stopped. This is not the same
|
||||
// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
|
||||
// it later). Set the status and phase appropriately
|
||||
runnable := kl.canRunPod(pod)
|
||||
if !runnable.Admit {
|
||||
// Pod is not runnable; update the Pod and Container statuses to why.
|
||||
// Pod is not runnable; and update the Pod and Container statuses to why.
|
||||
if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
|
||||
apiPodStatus.Phase = v1.PodPending
|
||||
}
|
||||
apiPodStatus.Reason = runnable.Reason
|
||||
apiPodStatus.Message = runnable.Message
|
||||
// Waiting containers are not creating.
|
||||
@@ -1591,22 +1573,28 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Update status in the status manager
|
||||
// Record the time it takes for the pod to become running.
|
||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
|
||||
!firstSeenTime.IsZero() {
|
||||
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
|
||||
}
|
||||
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
|
||||
// Kill pod if it should not be running
|
||||
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
|
||||
// Pods that are not runnable must be stopped - return a typed error to the pod worker
|
||||
if !runnable.Admit {
|
||||
klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
|
||||
var syncErr error
|
||||
if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
|
||||
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
|
||||
if err := kl.killPod(pod, p, nil); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
syncErr = fmt.Errorf("error killing pod: %v", err)
|
||||
utilruntime.HandleError(syncErr)
|
||||
} else {
|
||||
if !runnable.Admit {
|
||||
// There was no error killing the pod, but the pod cannot be run.
|
||||
// Return an error to signal that the sync loop should back off.
|
||||
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
|
||||
}
|
||||
// There was no error killing the pod, but the pod cannot be run.
|
||||
// Return an error to signal that the sync loop should back off.
|
||||
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
|
||||
}
|
||||
return syncErr
|
||||
}
|
||||
@@ -1622,7 +1610,8 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
pcm := kl.containerManager.NewPodContainerManager()
|
||||
// If pod has already been terminated then we need not create
|
||||
// or update the pod's cgroup
|
||||
if !kl.podIsTerminated(pod) {
|
||||
// TODO: once context cancellation is added this check can be removed
|
||||
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
|
||||
// When the kubelet is restarted with the cgroups-per-qos
|
||||
// flag enabled, all the pod's running containers
|
||||
// should be killed intermittently and brought back up
|
||||
@@ -1639,7 +1628,8 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
// exists or the pod is running for the first time
|
||||
podKilled := false
|
||||
if !pcm.Exists(pod) && !firstSync {
|
||||
if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
|
||||
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
|
||||
if err := kl.killPod(pod, p, nil); err == nil {
|
||||
podKilled = true
|
||||
} else {
|
||||
klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
|
||||
@@ -1673,6 +1663,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
// The mirror pod is semantically different from the static pod. Remove
|
||||
// it. The mirror pod will get recreated later.
|
||||
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
var err error
|
||||
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
|
||||
if deleted {
|
||||
@@ -1702,8 +1693,9 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Volume manager will not mount volumes for terminated pods
|
||||
if !kl.podIsTerminated(pod) {
|
||||
// Volume manager will not mount volumes for terminating pods
|
||||
// TODO: once context cancellation is added this check can be removed
|
||||
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
|
||||
// Wait for volumes to attach/mount
|
||||
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
|
||||
@@ -1734,6 +1726,125 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
|
||||
// returns without error, the pod's local state can be safely cleaned up. If runningPod is passed,
|
||||
// we perform no status updates.
|
||||
func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
|
||||
klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// when we receive a runtime only pod (runningPod != nil) we don't need to update the status
|
||||
// manager or refresh the status of the cache, because a successful killPod will ensure we do
|
||||
// not get invoked again
|
||||
if runningPod != nil {
|
||||
// we kill the pod with the specified grace period since this is a termination
|
||||
if gracePeriod != nil {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
|
||||
} else {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
|
||||
}
|
||||
if err := kl.killPod(pod, *runningPod, gracePeriod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
return nil
|
||||
}
|
||||
|
||||
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
|
||||
if podStatusFn != nil {
|
||||
podStatusFn(&apiPodStatus)
|
||||
}
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
|
||||
if gracePeriod != nil {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
|
||||
} else {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
|
||||
}
|
||||
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
|
||||
if err := kl.killPod(pod, p, gracePeriod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Guard against consistency issues in KillPod implementations by checking that there are no
|
||||
// running containers. This method is invoked infrequently so this is effectively free and can
|
||||
// catch race conditions introduced by callers updating pod status out of order.
|
||||
// TODO: have KillPod return the terminal status of stopped containers and write that into the
|
||||
// cache immediately
|
||||
podStatus, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
return err
|
||||
}
|
||||
var runningContainers []string
|
||||
var containers []string
|
||||
for _, s := range podStatus.ContainerStatuses {
|
||||
if s.State == kubecontainer.ContainerStateRunning {
|
||||
runningContainers = append(runningContainers, s.ID.String())
|
||||
}
|
||||
containers = append(containers, fmt.Sprintf("(%s state=%s exitCode=%d finishedAt=%s)", s.Name, s.State, s.ExitCode, s.FinishedAt.UTC().Format(time.RFC3339Nano)))
|
||||
}
|
||||
if klog.V(4).Enabled() {
|
||||
sort.Strings(containers)
|
||||
klog.InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
|
||||
}
|
||||
if len(runningContainers) > 0 {
|
||||
return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
|
||||
}
|
||||
|
||||
// we have successfully stopped all containers, the pod is terminating, our status is "done"
|
||||
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncTerminatedPod cleans up a pod that has terminated (has no running containers).
|
||||
// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which
|
||||
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup.
|
||||
// TODO: make this method take a context and exit early
|
||||
func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
||||
klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// generate the final status of the pod
|
||||
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
|
||||
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
|
||||
// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
|
||||
// before syncTerminatedPod is invoked)
|
||||
if err := kl.volumeManager.WaitForUnmount(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// Note: we leave pod containers to be reclaimed in the background since dockershim requires the
|
||||
// container for retrieving logs and we want to make sure logs are available until the pod is
|
||||
// physically deleted.
|
||||
|
||||
// remove any cgroups in the hierarchy for pods that are no longer running.
|
||||
if kl.cgroupsPerQOS {
|
||||
pcm := kl.containerManager.NewPodContainerManager()
|
||||
name, _ := pcm.GetPodContainerName(pod)
|
||||
if err := pcm.Destroy(name); err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
}
|
||||
|
||||
// mark the final pod status
|
||||
kl.statusManager.TerminatePod(pod)
|
||||
klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
|
||||
// * pod whose work is ready.
|
||||
// * internal modules that request sync of a pod.
|
||||
@@ -1776,28 +1887,11 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
|
||||
// for sources that haven't reported yet.
|
||||
return fmt.Errorf("skipping delete because sources aren't ready yet")
|
||||
}
|
||||
kl.podWorkers.ForgetWorker(pod.UID)
|
||||
|
||||
// make sure our runtimeCache is at least as fresh as the last container started event we observed.
|
||||
// this ensures we correctly send graceful deletion signals to all containers we've reported started.
|
||||
if lastContainerStarted, ok := kl.lastContainerStartedTime.Get(pod.UID); ok {
|
||||
if err := kl.runtimeCache.ForceUpdateIfOlder(lastContainerStarted); err != nil {
|
||||
return fmt.Errorf("error updating containers: %v", err)
|
||||
}
|
||||
}
|
||||
// Runtime cache may not have been updated to with the pod, but it's okay
|
||||
// because the periodic cleanup routine will attempt to delete again later.
|
||||
runningPods, err := kl.runtimeCache.GetPods()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing containers: %v", err)
|
||||
}
|
||||
runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
|
||||
if runningPod.IsEmpty() {
|
||||
return fmt.Errorf("pod not found")
|
||||
}
|
||||
podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
|
||||
kl.podKiller.KillPod(&podPair)
|
||||
|
||||
klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
UpdateType: kubetypes.SyncPodKill,
|
||||
})
|
||||
// We leave the volume/directory cleanup to the periodic cleanup routine.
|
||||
return nil
|
||||
}
|
||||
@@ -1835,7 +1929,7 @@ func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, strin
|
||||
func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
|
||||
attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
|
||||
// Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
|
||||
attrs.OtherPods = kl.filterOutTerminatedPods(kl.podManager.GetPods())
|
||||
attrs.OtherPods = kl.GetActivePods()
|
||||
|
||||
for _, handler := range kl.softAdmitHandlers {
|
||||
if result := handler.Admit(attrs); !result.Admit {
|
||||
@@ -2025,10 +2119,16 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
|
||||
// skip housekeeping, as we may accidentally delete pods from unready sources.
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
|
||||
} else {
|
||||
start := time.Now()
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping)")
|
||||
if err := handler.HandlePodCleanups(); err != nil {
|
||||
klog.ErrorS(err, "Failed cleaning pods")
|
||||
}
|
||||
duration := time.Since(start)
|
||||
if duration > housekeepingWarningDuration {
|
||||
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
|
||||
}
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping) end")
|
||||
}
|
||||
}
|
||||
return true
|
||||
@@ -2049,31 +2149,12 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle
|
||||
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
|
||||
// If the pod has completed termination, dispatchWork will perform no action.
|
||||
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
|
||||
// check whether we are ready to delete the pod from the API server (all status up to date)
|
||||
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
|
||||
if pod.DeletionTimestamp != nil && containersTerminal {
|
||||
klog.V(4).InfoS("Pod has completed execution and should be deleted from the API server", "pod", klog.KObj(pod), "syncType", syncType)
|
||||
kl.statusManager.TerminatePod(pod)
|
||||
return
|
||||
}
|
||||
|
||||
// optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
|
||||
// (i.e. the pod has completed and its containers have been terminated)
|
||||
if podWorkerTerminal && containersTerminal {
|
||||
klog.V(4).InfoS("Pod has completed and its containers have been terminated, ignoring remaining sync work", "pod", klog.KObj(pod), "syncType", syncType)
|
||||
return
|
||||
}
|
||||
|
||||
// Run the sync in an async worker.
|
||||
kl.podWorkers.UpdatePod(&UpdatePodOptions{
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: syncType,
|
||||
OnCompleteFunc: func(err error) {
|
||||
if err != nil {
|
||||
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
|
||||
}
|
||||
},
|
||||
StartTime: start,
|
||||
})
|
||||
// Note the number of containers for new pods.
|
||||
if syncType == kubetypes.SyncPodCreate {
|
||||
@@ -2109,10 +2190,13 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !kl.podIsTerminated(pod) {
|
||||
// Only go through the admission process if the pod is not
|
||||
// terminated.
|
||||
|
||||
// Only go through the admission process if the pod is not requested
|
||||
// for termination by another part of the kubelet. If the pod is already
|
||||
// using resources (previously admitted), the pod worker is going to be
|
||||
// shutting it down. If the pod hasn't started yet, we know that when
|
||||
// the pod worker is invoked it will also avoid setting up the pod, so
|
||||
// we simply avoid doing any work.
|
||||
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
|
||||
// We failed pods that we rejected, so activePods include all admitted
|
||||
// pods that are alive.
|
||||
activePods := kl.filterOutTerminatedPods(existingPods)
|
||||
@@ -2286,13 +2370,8 @@ func (kl *Kubelet) ListenAndServePodResources() {
|
||||
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
|
||||
func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
|
||||
if podStatus, err := kl.podCache.Get(podID); err == nil {
|
||||
removeAll := false
|
||||
if syncedPod, ok := kl.podManager.GetPodByUID(podID); ok {
|
||||
// generate the api status using the cached runtime status to get up-to-date ContainerStatuses
|
||||
apiPodStatus := kl.generateAPIPodStatus(syncedPod, podStatus)
|
||||
// When an evicted or deleted pod has already synced, all containers can be removed.
|
||||
removeAll = eviction.PodIsEvicted(syncedPod.Status) || (syncedPod.DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses))
|
||||
}
|
||||
// When an evicted or deleted pod has already synced, all containers can be removed.
|
||||
removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID)
|
||||
kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user