mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-20 17:38:50 +00:00
Merge pull request #102344 from smarterclayton/keep_pod_worker
Prevent Kubelet from incorrectly interpreting "not yet started" pods as "ready to terminate pods" by unifying responsibility for pod lifecycle into pod worker
This commit is contained in:
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"math"
|
||||
@@ -137,6 +138,11 @@ const (
|
||||
// Period for performing global cleanup tasks.
|
||||
housekeepingPeriod = time.Second * 2
|
||||
|
||||
// Duration at which housekeeping failed to satisfy the invariant that
|
||||
// housekeeping should be fast to avoid blocking pod config (while
|
||||
// housekeeping is running no new pods are started or deleted).
|
||||
housekeepingWarningDuration = time.Second * 15
|
||||
|
||||
// Period for performing eviction monitoring.
|
||||
// ensure this is kept in sync with internal cadvisor housekeeping.
|
||||
evictionMonitoringPeriod = time.Second * 10
|
||||
@@ -639,6 +645,20 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.containerLogManager = logs.NewStubContainerLogManager()
|
||||
}
|
||||
|
||||
klet.reasonCache = NewReasonCache()
|
||||
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
|
||||
klet.podWorkers = newPodWorkers(
|
||||
klet.syncPod,
|
||||
klet.syncTerminatingPod,
|
||||
klet.syncTerminatedPod,
|
||||
|
||||
kubeDeps.Recorder,
|
||||
klet.workQueue,
|
||||
klet.resyncInterval,
|
||||
backOffPeriod,
|
||||
klet.podCache,
|
||||
)
|
||||
|
||||
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
|
||||
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
||||
klet.livenessManager,
|
||||
@@ -646,7 +666,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.startupManager,
|
||||
seccompProfileRoot,
|
||||
machineInfo,
|
||||
klet,
|
||||
klet.podWorkers,
|
||||
kubeDeps.OSInterface,
|
||||
klet,
|
||||
httpClient,
|
||||
@@ -780,7 +800,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
kubeCfg.EnableControllerAttachDetach,
|
||||
nodeName,
|
||||
klet.podManager,
|
||||
klet.statusManager,
|
||||
klet.podWorkers,
|
||||
klet.kubeClient,
|
||||
klet.volumePluginMgr,
|
||||
klet.containerRuntime,
|
||||
@@ -792,12 +812,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
keepTerminatedPodVolumes,
|
||||
volumepathhandler.NewBlockVolumePathHandler())
|
||||
|
||||
klet.reasonCache = NewReasonCache()
|
||||
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
|
||||
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
|
||||
|
||||
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
|
||||
klet.podKiller = NewPodKiller(klet)
|
||||
|
||||
// setup eviction manager
|
||||
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
|
||||
@@ -1103,9 +1118,6 @@ type Kubelet struct {
|
||||
// Container restart Backoff
|
||||
backOff *flowcontrol.Backoff
|
||||
|
||||
// Pod killer handles pods to be killed
|
||||
podKiller PodKiller
|
||||
|
||||
// Information about the ports which are opened by daemons on Node running this Kubelet server.
|
||||
daemonEndpoints *v1.NodeDaemonEndpoints
|
||||
|
||||
@@ -1470,10 +1482,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
kl.initNetworkUtil()
|
||||
}
|
||||
|
||||
// Start a goroutine responsible for killing pods (that are not properly
|
||||
// handled by pod workers).
|
||||
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
|
||||
|
||||
// Start component sync loops.
|
||||
kl.statusManager.Start()
|
||||
|
||||
@@ -1487,7 +1495,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
kl.syncLoop(updates, kl)
|
||||
}
|
||||
|
||||
// syncPod is the transaction script for the sync of a single pod.
|
||||
// syncPod is the transaction script for the sync of a single pod (setting up)
|
||||
// a pod. The reverse (teardown) is handled in syncTerminatingPod and
|
||||
// syncTerminatedPod. If syncPod exits without error, then the pod runtime
|
||||
// state is in sync with the desired configuration state (pod is running).
|
||||
// If syncPod exits with a transient error, the next invocation of syncPod
|
||||
// is expected to make progress towards reaching the runtime state.
|
||||
//
|
||||
// Arguments:
|
||||
//
|
||||
@@ -1499,7 +1512,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
// * If the pod is being seen as running for the first time, record pod
|
||||
// start latency
|
||||
// * Update the status of the pod in the status manager
|
||||
// * Kill the pod if it should not be running
|
||||
// * Kill the pod if it should not be running due to soft admission
|
||||
// * Create a mirror pod if the pod is a static pod, and does not
|
||||
// already have a mirror pod
|
||||
// * Create the data directories for the pod if they do not exist
|
||||
@@ -1514,39 +1527,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
// This operation writes all events that are dispatched in order to provide
|
||||
// the most accurate information possible about an error situation to aid debugging.
|
||||
// Callers should not throw an event if this operation returns an error.
|
||||
func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
// pull out the required options
|
||||
pod := o.pod
|
||||
mirrorPod := o.mirrorPod
|
||||
podStatus := o.podStatus
|
||||
updateType := o.updateType
|
||||
|
||||
// if we want to kill a pod, do it now!
|
||||
if updateType == kubetypes.SyncPodKill {
|
||||
killPodOptions := o.killPodOptions
|
||||
if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
|
||||
return fmt.Errorf("kill pod options are required if update type is kill")
|
||||
}
|
||||
apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
// we kill the pod with the specified grace period since this is a termination
|
||||
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// If a pod is still gracefully terminating, then we do not want to
|
||||
// take further action. This mitigates static pods and deleted pods
|
||||
// from getting rerun prematurely or their cgroups being deleted before
|
||||
// the runtime cleans up.
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
if kl.podKiller.IsPodPendingTerminationByPodName(podFullName) {
|
||||
return fmt.Errorf("pod %q is pending termination", podFullName)
|
||||
}
|
||||
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
||||
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// Latency measurements for the main workflow are relative to the
|
||||
// first time the pod was seen by the API server.
|
||||
@@ -1583,16 +1566,15 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
podStatus.IPs = []string{apiPodStatus.PodIP}
|
||||
}
|
||||
|
||||
// Record the time it takes for the pod to become running.
|
||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
|
||||
!firstSeenTime.IsZero() {
|
||||
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
|
||||
}
|
||||
|
||||
// If the pod should not be running, we request the pod's containers be stopped. This is not the same
|
||||
// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
|
||||
// it later). Set the status and phase appropriately
|
||||
runnable := kl.canRunPod(pod)
|
||||
if !runnable.Admit {
|
||||
// Pod is not runnable; update the Pod and Container statuses to why.
|
||||
// Pod is not runnable; and update the Pod and Container statuses to why.
|
||||
if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
|
||||
apiPodStatus.Phase = v1.PodPending
|
||||
}
|
||||
apiPodStatus.Reason = runnable.Reason
|
||||
apiPodStatus.Message = runnable.Message
|
||||
// Waiting containers are not creating.
|
||||
@@ -1609,22 +1591,28 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Update status in the status manager
|
||||
// Record the time it takes for the pod to become running.
|
||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
|
||||
!firstSeenTime.IsZero() {
|
||||
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
|
||||
}
|
||||
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
|
||||
// Kill pod if it should not be running
|
||||
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
|
||||
// Pods that are not runnable must be stopped - return a typed error to the pod worker
|
||||
if !runnable.Admit {
|
||||
klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
|
||||
var syncErr error
|
||||
if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
|
||||
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
|
||||
if err := kl.killPod(pod, p, nil); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
syncErr = fmt.Errorf("error killing pod: %v", err)
|
||||
utilruntime.HandleError(syncErr)
|
||||
} else {
|
||||
if !runnable.Admit {
|
||||
// There was no error killing the pod, but the pod cannot be run.
|
||||
// Return an error to signal that the sync loop should back off.
|
||||
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
|
||||
}
|
||||
// There was no error killing the pod, but the pod cannot be run.
|
||||
// Return an error to signal that the sync loop should back off.
|
||||
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
|
||||
}
|
||||
return syncErr
|
||||
}
|
||||
@@ -1640,7 +1628,8 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
pcm := kl.containerManager.NewPodContainerManager()
|
||||
// If pod has already been terminated then we need not create
|
||||
// or update the pod's cgroup
|
||||
if !kl.podIsTerminated(pod) {
|
||||
// TODO: once context cancellation is added this check can be removed
|
||||
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
|
||||
// When the kubelet is restarted with the cgroups-per-qos
|
||||
// flag enabled, all the pod's running containers
|
||||
// should be killed intermittently and brought back up
|
||||
@@ -1657,7 +1646,8 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
// exists or the pod is running for the first time
|
||||
podKilled := false
|
||||
if !pcm.Exists(pod) && !firstSync {
|
||||
if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
|
||||
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
|
||||
if err := kl.killPod(pod, p, nil); err == nil {
|
||||
podKilled = true
|
||||
} else {
|
||||
klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
|
||||
@@ -1691,6 +1681,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
// The mirror pod is semantically different from the static pod. Remove
|
||||
// it. The mirror pod will get recreated later.
|
||||
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
var err error
|
||||
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
|
||||
if deleted {
|
||||
@@ -1720,8 +1711,9 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Volume manager will not mount volumes for terminated pods
|
||||
if !kl.podIsTerminated(pod) {
|
||||
// Volume manager will not mount volumes for terminating pods
|
||||
// TODO: once context cancellation is added this check can be removed
|
||||
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
|
||||
// Wait for volumes to attach/mount
|
||||
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
|
||||
@@ -1752,6 +1744,125 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
|
||||
// returns without error, the pod's local state can be safely cleaned up. If runningPod is passed,
|
||||
// we perform no status updates.
|
||||
func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
|
||||
klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// when we receive a runtime only pod (runningPod != nil) we don't need to update the status
|
||||
// manager or refresh the status of the cache, because a successful killPod will ensure we do
|
||||
// not get invoked again
|
||||
if runningPod != nil {
|
||||
// we kill the pod with the specified grace period since this is a termination
|
||||
if gracePeriod != nil {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
|
||||
} else {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
|
||||
}
|
||||
if err := kl.killPod(pod, *runningPod, gracePeriod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
return nil
|
||||
}
|
||||
|
||||
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
|
||||
if podStatusFn != nil {
|
||||
podStatusFn(&apiPodStatus)
|
||||
}
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
|
||||
if gracePeriod != nil {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
|
||||
} else {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
|
||||
}
|
||||
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
|
||||
if err := kl.killPod(pod, p, gracePeriod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Guard against consistency issues in KillPod implementations by checking that there are no
|
||||
// running containers. This method is invoked infrequently so this is effectively free and can
|
||||
// catch race conditions introduced by callers updating pod status out of order.
|
||||
// TODO: have KillPod return the terminal status of stopped containers and write that into the
|
||||
// cache immediately
|
||||
podStatus, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
return err
|
||||
}
|
||||
var runningContainers []string
|
||||
var containers []string
|
||||
for _, s := range podStatus.ContainerStatuses {
|
||||
if s.State == kubecontainer.ContainerStateRunning {
|
||||
runningContainers = append(runningContainers, s.ID.String())
|
||||
}
|
||||
containers = append(containers, fmt.Sprintf("(%s state=%s exitCode=%d finishedAt=%s)", s.Name, s.State, s.ExitCode, s.FinishedAt.UTC().Format(time.RFC3339Nano)))
|
||||
}
|
||||
if klog.V(4).Enabled() {
|
||||
sort.Strings(containers)
|
||||
klog.InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
|
||||
}
|
||||
if len(runningContainers) > 0 {
|
||||
return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
|
||||
}
|
||||
|
||||
// we have successfully stopped all containers, the pod is terminating, our status is "done"
|
||||
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncTerminatedPod cleans up a pod that has terminated (has no running containers).
|
||||
// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which
|
||||
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup.
|
||||
// TODO: make this method take a context and exit early
|
||||
func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
||||
klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// generate the final status of the pod
|
||||
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
|
||||
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
|
||||
kl.statusManager.SetPodStatus(pod, apiPodStatus)
|
||||
|
||||
// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
|
||||
// before syncTerminatedPod is invoked)
|
||||
if err := kl.volumeManager.WaitForUnmount(pod); err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// Note: we leave pod containers to be reclaimed in the background since dockershim requires the
|
||||
// container for retrieving logs and we want to make sure logs are available until the pod is
|
||||
// physically deleted.
|
||||
|
||||
// remove any cgroups in the hierarchy for pods that are no longer running.
|
||||
if kl.cgroupsPerQOS {
|
||||
pcm := kl.containerManager.NewPodContainerManager()
|
||||
name, _ := pcm.GetPodContainerName(pod)
|
||||
if err := pcm.Destroy(name); err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
}
|
||||
|
||||
// mark the final pod status
|
||||
kl.statusManager.TerminatePod(pod)
|
||||
klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
|
||||
// * pod whose work is ready.
|
||||
// * internal modules that request sync of a pod.
|
||||
@@ -1794,28 +1905,11 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
|
||||
// for sources that haven't reported yet.
|
||||
return fmt.Errorf("skipping delete because sources aren't ready yet")
|
||||
}
|
||||
kl.podWorkers.ForgetWorker(pod.UID)
|
||||
|
||||
// make sure our runtimeCache is at least as fresh as the last container started event we observed.
|
||||
// this ensures we correctly send graceful deletion signals to all containers we've reported started.
|
||||
if lastContainerStarted, ok := kl.lastContainerStartedTime.Get(pod.UID); ok {
|
||||
if err := kl.runtimeCache.ForceUpdateIfOlder(lastContainerStarted); err != nil {
|
||||
return fmt.Errorf("error updating containers: %v", err)
|
||||
}
|
||||
}
|
||||
// Runtime cache may not have been updated to with the pod, but it's okay
|
||||
// because the periodic cleanup routine will attempt to delete again later.
|
||||
runningPods, err := kl.runtimeCache.GetPods()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error listing containers: %v", err)
|
||||
}
|
||||
runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
|
||||
if runningPod.IsEmpty() {
|
||||
return fmt.Errorf("pod not found")
|
||||
}
|
||||
podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
|
||||
kl.podKiller.KillPod(&podPair)
|
||||
|
||||
klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
UpdateType: kubetypes.SyncPodKill,
|
||||
})
|
||||
// We leave the volume/directory cleanup to the periodic cleanup routine.
|
||||
return nil
|
||||
}
|
||||
@@ -1853,7 +1947,7 @@ func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, strin
|
||||
func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
|
||||
attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
|
||||
// Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
|
||||
attrs.OtherPods = kl.filterOutTerminatedPods(kl.podManager.GetPods())
|
||||
attrs.OtherPods = kl.GetActivePods()
|
||||
|
||||
for _, handler := range kl.softAdmitHandlers {
|
||||
if result := handler.Admit(attrs); !result.Admit {
|
||||
@@ -2043,10 +2137,16 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
|
||||
// skip housekeeping, as we may accidentally delete pods from unready sources.
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
|
||||
} else {
|
||||
start := time.Now()
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping)")
|
||||
if err := handler.HandlePodCleanups(); err != nil {
|
||||
klog.ErrorS(err, "Failed cleaning pods")
|
||||
}
|
||||
duration := time.Since(start)
|
||||
if duration > housekeepingWarningDuration {
|
||||
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
|
||||
}
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping) end")
|
||||
}
|
||||
}
|
||||
return true
|
||||
@@ -2067,31 +2167,12 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle
|
||||
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
|
||||
// If the pod has completed termination, dispatchWork will perform no action.
|
||||
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
|
||||
// check whether we are ready to delete the pod from the API server (all status up to date)
|
||||
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
|
||||
if pod.DeletionTimestamp != nil && containersTerminal {
|
||||
klog.V(4).InfoS("Pod has completed execution and should be deleted from the API server", "pod", klog.KObj(pod), "syncType", syncType)
|
||||
kl.statusManager.TerminatePod(pod)
|
||||
return
|
||||
}
|
||||
|
||||
// optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
|
||||
// (i.e. the pod has completed and its containers have been terminated)
|
||||
if podWorkerTerminal && containersTerminal {
|
||||
klog.V(4).InfoS("Pod has completed and its containers have been terminated, ignoring remaining sync work", "pod", klog.KObj(pod), "syncType", syncType)
|
||||
return
|
||||
}
|
||||
|
||||
// Run the sync in an async worker.
|
||||
kl.podWorkers.UpdatePod(&UpdatePodOptions{
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
Pod: pod,
|
||||
MirrorPod: mirrorPod,
|
||||
UpdateType: syncType,
|
||||
OnCompleteFunc: func(err error) {
|
||||
if err != nil {
|
||||
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
|
||||
}
|
||||
},
|
||||
StartTime: start,
|
||||
})
|
||||
// Note the number of containers for new pods.
|
||||
if syncType == kubetypes.SyncPodCreate {
|
||||
@@ -2127,10 +2208,13 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !kl.podIsTerminated(pod) {
|
||||
// Only go through the admission process if the pod is not
|
||||
// terminated.
|
||||
|
||||
// Only go through the admission process if the pod is not requested
|
||||
// for termination by another part of the kubelet. If the pod is already
|
||||
// using resources (previously admitted), the pod worker is going to be
|
||||
// shutting it down. If the pod hasn't started yet, we know that when
|
||||
// the pod worker is invoked it will also avoid setting up the pod, so
|
||||
// we simply avoid doing any work.
|
||||
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
|
||||
// We failed pods that we rejected, so activePods include all admitted
|
||||
// pods that are alive.
|
||||
activePods := kl.filterOutTerminatedPods(existingPods)
|
||||
@@ -2304,13 +2388,8 @@ func (kl *Kubelet) ListenAndServePodResources() {
|
||||
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
|
||||
func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
|
||||
if podStatus, err := kl.podCache.Get(podID); err == nil {
|
||||
removeAll := false
|
||||
if syncedPod, ok := kl.podManager.GetPodByUID(podID); ok {
|
||||
// generate the api status using the cached runtime status to get up-to-date ContainerStatuses
|
||||
apiPodStatus := kl.generateAPIPodStatus(syncedPod, podStatus)
|
||||
// When an evicted or deleted pod has already synced, all containers can be removed.
|
||||
removeAll = eviction.PodIsEvicted(syncedPod.Status) || (syncedPod.DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses))
|
||||
}
|
||||
// When an evicted or deleted pod has already synced, all containers can be removed.
|
||||
removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID)
|
||||
kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user