kubelet: Force deleted pods can fail to move out of terminating

If a CRI error occurs during the terminating phase after a pod is
force deleted (API or static) then the housekeeping loop will not
deliver updates to the pod worker which prevents the pod's state
machine from progressing. The pod will remain in the terminating
phase but no further attempts to terminate or cleanup will occur
until the kubelet is restarted.

The pod worker now maintains a store of the pods state that it is
attempting to reconcile and uses that to resync unknown pods when
SyncKnownPods() is invoked, so that failures in sync methods for
unknown pods no longer hang forever.

The pod worker's store tracks desired updates and the last update
applied on podSyncStatuses. Each goroutine now synchronizes to
acquire the next work item, context, and whether the pod can start.
This synchronization moves the pending update to the stored last
update, which will ensure third parties accessing pod worker state
don't see updates before the pod worker begins synchronizing them.

As a consequence, the update channel becomes a simple notifier
(struct{}) so that SyncKnownPods can coordinate with the pod worker
to create a synthetic pending update for unknown pods (i.e. no one
besides the pod worker has data about those pods). Otherwise the
pending update info would be hidden inside the channel.

In order to properly track pending updates, we have to be very
careful not to mix RunningPods (which are calculated from the
container runtime and are missing all spec info) and config-
sourced pods. Update the pod worker to avoid using ToAPIPod()
and instead require the pod worker to directly use
update.Options.Pod or update.Options.RunningPod for the
correct methods. Add a new SyncTerminatingRuntimePod to prevent
accidental invocations of runtime only pod data.

Finally, fix SyncKnownPods to replay the last valid update for
undesired pods which drives the pod state machine towards
termination, and alter HandlePodCleanups to:

- terminate runtime pods that aren't known to the pod worker
- launch admitted pods that aren't known to the pod worker

Any started pods receive a replay until they reach the finished
state, and then are removed from the pod worker. When a desired
pod is detected as not being in the worker, the usual cause is
that the pod was deleted and recreated with the same UID (almost
always a static pod since API UID reuse is statistically
unlikely). This simplifies the previous restartable pod support.
We are careful to filter for active pods (those not already
terminal or those which have been previously rejected by
admission). We also force a refresh of the runtime cache to
ensure we don't see an older version of the state.

Future changes will allow other components that need to view the
pod worker's actual state (not the desired state the podManager
represents) to retrieve that info from the pod worker.

Several bugs in pod lifecycle have been undetectable at runtime
because the kubelet does not clearly describe the number of pods
in use. To better report, add the following metrics:

  kubelet_desired_pods: Pods the pod manager sees
  kubelet_active_pods: "Admitted" pods that gate new pods
  kubelet_mirror_pods: Mirror pods the kubelet is tracking
  kubelet_working_pods: Breakdown of pods from the last sync in
    each phase, orphaned state, and static or not
  kubelet_restarted_pods_total: A counter for pods that saw a
    CREATE before the previous pod with the same UID was finished
  kubelet_orphaned_runtime_pods_total: A counter for pods detected
    at runtime that were not known to the kubelet. Will be
    populated at Kubelet startup and should never be incremented
    after.

Add a metric check to our e2e tests that verifies the values are
captured correctly during a serial test, and then verify them in
detail in unit tests.

Adds 23 series to the kubelet /metrics endpoint.
This commit is contained in:
Clayton Coleman 2022-10-18 11:54:49 -04:00
parent c5a1f0188b
commit 6b9a381185
No known key found for this signature in database
GPG Key ID: CF7DB7FC943D3E0E
13 changed files with 2970 additions and 676 deletions

View File

@ -70,6 +70,7 @@ allowed_prometheus_importers=(
./staging/src/k8s.io/component-base/metrics/value.go
./staging/src/k8s.io/component-base/metrics/wrappers.go
./test/e2e/apimachinery/flowcontrol.go
./test/e2e_node/mirror_pod_grace_period_test.go
./test/e2e/node/pods.go
./test/e2e_node/resource_metrics_test.go
./test/instrumentation/main_test.go

View File

@ -23,11 +23,6 @@ import (
"time"
)
var (
// TODO(yifan): Maybe set the them as parameters for NewCache().
defaultCachePeriod = time.Second * 2
)
// RuntimeCache is in interface for obtaining cached Pods.
type RuntimeCache interface {
GetPods(context.Context) ([]*Pod, error)
@ -39,9 +34,10 @@ type podsGetter interface {
}
// NewRuntimeCache creates a container runtime cache.
func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
func NewRuntimeCache(getter podsGetter, cachePeriod time.Duration) (RuntimeCache, error) {
return &runtimeCache{
getter: getter,
getter: getter,
cachePeriod: cachePeriod,
}, nil
}
@ -53,6 +49,8 @@ type runtimeCache struct {
sync.Mutex
// The underlying container runtime used to update the cache.
getter podsGetter
// The interval after which the cache should be refreshed.
cachePeriod time.Duration
// Last time when cache was updated.
cacheTime time.Time
// The content of the cache.
@ -64,7 +62,7 @@ type runtimeCache struct {
func (r *runtimeCache) GetPods(ctx context.Context) ([]*Pod, error) {
r.Lock()
defer r.Unlock()
if time.Since(r.cacheTime) > defaultCachePeriod {
if time.Since(r.cacheTime) > r.cachePeriod {
if err := r.updateCache(ctx); err != nil {
return nil, err
}

View File

@ -148,7 +148,12 @@ const (
// Duration at which housekeeping failed to satisfy the invariant that
// housekeeping should be fast to avoid blocking pod config (while
// housekeeping is running no new pods are started or deleted).
housekeepingWarningDuration = time.Second * 15
housekeepingWarningDuration = time.Second * 1
// Period after which the runtime cache expires - set to slightly longer than
// the expected length between housekeeping periods, which explicitly refreshes
// the cache.
runtimeCacheRefreshPeriod = housekeepingPeriod + housekeepingWarningDuration
// Period for performing eviction monitoring.
// ensure this is kept in sync with internal cadvisor housekeeping.
@ -636,10 +641,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(
klet.syncPod,
klet.syncTerminatingPod,
klet.syncTerminatedPod,
klet,
kubeDeps.Recorder,
klet.workQueue,
klet.resyncInterval,
@ -685,7 +687,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.streamingRuntime = runtime
klet.runner = runtime
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime, runtimeCacheRefreshPeriod)
if err != nil {
return nil, err
}
@ -1562,17 +1564,18 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
kl.syncLoop(ctx, updates, kl)
}
// syncPod is the transaction script for the sync of a single pod (setting up)
// SyncPod is the transaction script for the sync of a single pod (setting up)
// a pod. This method is reentrant and expected to converge a pod towards the
// desired state of the spec. The reverse (teardown) is handled in
// syncTerminatingPod and syncTerminatedPod. If syncPod exits without error,
// SyncTerminatingPod and SyncTerminatedPod. If SyncPod exits without error,
// then the pod runtime state is in sync with the desired configuration state
// (pod is running). If syncPod exits with a transient error, the next
// invocation of syncPod is expected to make progress towards reaching the
// runtime state. syncPod exits with isTerminal when the pod was detected to
// (pod is running). If SyncPod exits with a transient error, the next
// invocation of SyncPod is expected to make progress towards reaching the
// desired state. SyncPod exits with isTerminal when the pod was detected to
// have reached a terminal lifecycle phase due to container exits (for
// RestartNever or RestartOnFailure) and the next method invoked will by
// syncTerminatingPod.
// RestartNever or RestartOnFailure) and the next method invoked will be
// SyncTerminatingPod. If the pod terminates for any other reason, SyncPod
// will receive a context cancellation and should exit as soon as possible.
//
// Arguments:
//
@ -1585,7 +1588,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
//
// podStatus - the most recent pod status observed for this pod which can
// be used to determine the set of actions that should be taken during
// this loop of syncPod
// this loop of SyncPod
//
// The workflow is:
// - If the pod is being created, record pod worker start latency
@ -1605,18 +1608,18 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// - Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
// on the next SyncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx := context.TODO()
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer func() {
klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
}()
// Latency measurements for the main workflow are relative to the
@ -1871,35 +1874,21 @@ func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType,
return false, nil
}
// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
// returns without error, the pod's local state can be safely cleaned up. If runningPod is passed,
// we perform no status updates.
func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
// SyncTerminatingPod is expected to terminate all running containers in a pod. Once this method
// returns without error, the pod is considered to be terminated and it will be safe to clean up any
// pod state that is tied to the lifetime of running containers. The next method invoked will be
// SyncTerminatedPod. This method is expected to return with the grace period provided and the
// provided context may be cancelled if the duration is exceeded. The method may also be interrupted
// with a context cancellation if the grace period is shortened by the user or the kubelet (such as
// during eviction). This method is not guaranteed to be called if a pod is force deleted from the
// configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned
// pods.
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx := context.Background()
klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
// when we receive a runtime only pod (runningPod != nil) we don't need to update the status
// manager or refresh the status of the cache, because a successful killPod will ensure we do
// not get invoked again
if runningPod != nil {
// we kill the pod with the specified grace period since this is a termination
if gracePeriod != nil {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
} else {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
}
if err := kl.killPod(ctx, pod, *runningPod, gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
if podStatusFn != nil {
@ -1980,13 +1969,47 @@ func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
return nil
}
// syncTerminatedPod cleans up a pod that has terminated (has no running containers).
// SyncTerminatingRuntimePod is expected to terminate running containers in a pod that we have no
// configuration for. Once this method returns without error, any remaining local state can be safely
// cleaned up by background processes in each subsystem. Unlike syncTerminatingPod, we lack
// knowledge of the full pod spec and so cannot perform lifecycle related operations, only ensure
// that the remnant of the running pod is terminated and allow garbage collection to proceed. We do
// not update the status of the pod because with the source of configuration removed, we have no
// place to send that status.
func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx := context.Background()
pod := runningPod.ToAPIPod()
klog.V(4).InfoS("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
// we kill the pod directly since we have lost all other information about the pod.
klog.V(4).InfoS("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID)
// TODO: this should probably be zero, to bypass any waiting (needs fixes in container runtime)
gracePeriod := int64(1)
if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
klog.V(4).InfoS("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
// SyncTerminatedPod cleans up a pod that has terminated (has no running containers).
// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup.
// TODO: make this method take a context and exit early
func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup. This method
// reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios.
//
// Because the kubelet has no local store of information, all actions in this method that modify
// on-disk state must be reentrant and be garbage collected by HandlePodCleanups or a separate loop.
// This typically occurs when a pod is force deleted from configuration (local disk or API) and the
// kubelet restarts in the middle of the action.
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
// generate the final status of the pod
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
@ -2324,9 +2347,9 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than expected", "expected", housekeepingWarningDuration, "actual", duration.Round(time.Millisecond))
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
klog.V(4).InfoS("SyncLoop (housekeeping) end", "duration", duration.Round(time.Millisecond))
}
}
return true

View File

@ -54,6 +54,7 @@ import (
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/cri/streaming/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
@ -1057,7 +1058,7 @@ func (kl *Kubelet) deleteOrphanedMirrorPods() {
if err != nil {
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
} else {
klog.V(3).InfoS("Deleted pod", "podName", podFullname)
klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname)
}
}
}
@ -1066,9 +1067,16 @@ func (kl *Kubelet) deleteOrphanedMirrorPods() {
// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories. No config changes are sent to pod workers while this method
// is executing which means no new pods can appear.
// NOTE: This function is executed by the main sync loop, so it
// should not contain any blocking calls.
// is executing which means no new pods can appear. After this method completes
// the desired state of the kubelet should be reconciled with the actual state
// in the pod worker and other pod-related components.
//
// This function is executed by the main sync loop, so it must execute quickly
// and all nested calls should be asynchronous. Any slow reconciliation actions
// should be performed by other components (like the volume manager). The duration
// of this call is the minimum latency for static pods to be restarted if they
// are updated with a fixed UID (most should use a dynamic UID), and no config
// updates are delivered to the pod workers while this method is running.
func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// The kubelet lacks checkpointing, so we need to introspect the set of pods
// in the cgroup tree prior to inspecting the set of pods in our pod manager.
@ -1087,6 +1095,15 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
}
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
activePods := kl.filterOutInactivePods(allPods)
allRegularPods, allStaticPods := splitPodsByStatic(allPods)
activeRegularPods, activeStaticPods := splitPodsByStatic(activePods)
metrics.DesiredPodCount.WithLabelValues("").Set(float64(len(allRegularPods)))
metrics.DesiredPodCount.WithLabelValues("true").Set(float64(len(allStaticPods)))
metrics.ActivePodCount.WithLabelValues("").Set(float64(len(activeRegularPods)))
metrics.ActivePodCount.WithLabelValues("true").Set(float64(len(activeStaticPods)))
metrics.MirrorPodCount.Set(float64(len(mirrorPods)))
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
// of such pods should not be changed, and there is no need to sync them.
@ -1102,6 +1119,10 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
klog.V(3).InfoS("Clean up pod workers for terminated pods")
workingPods := kl.podWorkers.SyncKnownPods(allPods)
// Reconcile: At this point the pod workers have been pruned to the set of
// desired pods. Pods that must be restarted due to UID reuse, or leftover
// pods from previous runs, are not known to the pod worker.
allPodsByUID := make(map[types.UID]*v1.Pod)
for _, pod := range allPods {
allPodsByUID[pod.UID] = pod
@ -1112,70 +1133,45 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// that have already been removed from config. Pods that are terminating
// will be added to possiblyRunningPods, to prevent overly aggressive
// cleanup of pod cgroups.
stringIfTrue := func(t bool) string {
if t {
return "true"
}
return ""
}
runningPods := make(map[types.UID]sets.Empty)
possiblyRunningPods := make(map[types.UID]sets.Empty)
restartablePods := make(map[types.UID]sets.Empty)
for uid, sync := range workingPods {
switch sync {
switch sync.State {
case SyncPod:
runningPods[uid] = struct{}{}
possiblyRunningPods[uid] = struct{}{}
case TerminatingPod:
possiblyRunningPods[uid] = struct{}{}
case TerminatedAndRecreatedPod:
restartablePods[uid] = struct{}{}
default:
}
}
// Retrieve the list of running containers from the runtime to perform cleanup.
// We need the latest state to avoid delaying restarts of static pods that reuse
// a UID.
if err := kl.runtimeCache.ForceUpdateIfOlder(ctx, kl.clock.Now()); err != nil {
klog.ErrorS(err, "Error listing containers")
return err
}
runningRuntimePods, err := kl.runtimeCache.GetPods(ctx)
if err != nil {
klog.ErrorS(err, "Error listing containers")
return err
}
// Stop probing pods that are not running
klog.V(3).InfoS("Clean up probes for terminated pods")
kl.probeManager.CleanupPods(possiblyRunningPods)
// Terminate any pods that are observed in the runtime but not
// present in the list of known running pods from config.
runningRuntimePods, err := kl.runtimeCache.GetPods(ctx)
if err != nil {
klog.ErrorS(err, "Error listing containers")
return err
}
for _, runningPod := range runningRuntimePods {
switch workerState, ok := workingPods[runningPod.ID]; {
case ok && workerState == SyncPod, ok && workerState == TerminatingPod:
// if the pod worker is already in charge of this pod, we don't need to do anything
continue
default:
// If the pod isn't in the set that should be running and isn't already terminating, terminate
// now. This termination is aggressive because all known pods should already be in a known state
// (i.e. a removed static pod should already be terminating), so these are pods that were
// orphaned due to kubelet restart or bugs. Since housekeeping blocks other config changes, we
// know that another pod wasn't started in the background so we are safe to terminate the
// unknown pods.
if _, ok := allPodsByUID[runningPod.ID]; !ok {
klog.V(3).InfoS("Clean up orphaned pod containers", "podUID", runningPod.ID)
one := int64(1)
kl.podWorkers.UpdatePod(UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
RunningPod: runningPod,
KillPodOptions: &KillPodOptions{
PodTerminationGracePeriodSecondsOverride: &one,
},
})
}
}
}
// Remove orphaned pod statuses not in the total list of known config pods
klog.V(3).InfoS("Clean up orphaned pod statuses")
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
// Note that we just killed the unwanted pods. This may not have reflected
// in the cache. We need to bypass the cache to get the latest set of
// running pods to clean up the volumes.
// TODO: Evaluate the performance impact of bypassing the runtime cache.
runningRuntimePods, err = kl.containerRuntime.GetPods(ctx, false)
if err != nil {
klog.ErrorS(err, "Error listing containers")
return err
}
// Remove orphaned pod user namespace allocations (if any).
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
@ -1204,6 +1200,102 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
klog.V(3).InfoS("Clean up orphaned mirror pods")
kl.deleteOrphanedMirrorPods()
// At this point, the pod worker is aware of which pods are not desired (SyncKnownPods).
// We now look through the set of active pods for those that the pod worker is not aware of
// and deliver an update. The most common reason a pod is not known is because the pod was
// deleted and recreated with the same UID while the pod worker was driving its lifecycle (very
// very rare for API pods, common for static pods with fixed UIDs). Containers that may still
// be running from a previous execution must be reconciled by the pod worker's sync method.
// We must use active pods because that is the set of admitted pods (podManager includes pods
// that will never be run, and statusManager tracks already rejected pods).
var restartCount, restartCountStatic int
for _, desiredPod := range activePods {
if _, knownPod := workingPods[desiredPod.UID]; knownPod {
continue
}
klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID)
isStatic := kubetypes.IsStaticPod(desiredPod)
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod)
kl.podWorkers.UpdatePod(UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: desiredPod,
MirrorPod: mirrorPod,
})
// the desired pod is now known as well
workingPods[desiredPod.UID] = PodWorkerSync{State: SyncPod, HasConfig: true, Static: isStatic}
if isStatic {
// restartable static pods are the normal case
restartCountStatic++
} else {
// almost certainly means shenanigans, as API pods should never have the same UID after being deleted and recreated
// unless there is a major API violation
restartCount++
}
}
metrics.RestartedPodTotal.WithLabelValues("true").Add(float64(restartCountStatic))
metrics.RestartedPodTotal.WithLabelValues("").Add(float64(restartCount))
// Finally, terminate any pods that are observed in the runtime but not present in the list of
// known running pods from config. If we do terminate running runtime pods that will happen
// asynchronously in the background and those will be processed in the next invocation of
// HandlePodCleanups.
var orphanCount int
for _, runningPod := range runningRuntimePods {
// If there are orphaned pod resources in CRI that are unknown to the pod worker, terminate them
// now. Since housekeeping is exclusive to other pod worker updates, we know that no pods have
// been added to the pod worker in the meantime. Note that pods that are not visible in the runtime
// but which were previously known are terminated by SyncKnownPods().
_, knownPod := workingPods[runningPod.ID]
if !knownPod {
one := int64(1)
killPodOptions := &KillPodOptions{
PodTerminationGracePeriodSecondsOverride: &one,
}
klog.V(2).InfoS("Clean up containers for orphaned pod we had not seen before", "podUID", runningPod.ID, "killPodOptions", killPodOptions)
kl.podWorkers.UpdatePod(UpdatePodOptions{
UpdateType: kubetypes.SyncPodKill,
RunningPod: runningPod,
KillPodOptions: killPodOptions,
})
// the running pod is now known as well
workingPods[runningPod.ID] = PodWorkerSync{State: TerminatingPod, Orphan: true}
orphanCount++
}
}
metrics.OrphanedRuntimePodTotal.Add(float64(orphanCount))
// Now that we have recorded any terminating pods, and added new pods that should be running,
// record a summary here. Not all possible combinations of PodWorkerSync values are valid.
counts := make(map[PodWorkerSync]int)
for _, sync := range workingPods {
counts[sync]++
}
for validSync, configState := range map[PodWorkerSync]string{
{HasConfig: true, Static: true}: "desired",
{HasConfig: true, Static: false}: "desired",
{Orphan: true, HasConfig: true, Static: true}: "orphan",
{Orphan: true, HasConfig: true, Static: false}: "orphan",
{Orphan: true, HasConfig: false}: "runtime_only",
} {
for _, state := range []PodWorkerState{SyncPod, TerminatingPod, TerminatedPod} {
validSync.State = state
count := counts[validSync]
delete(counts, validSync)
staticString := stringIfTrue(validSync.Static)
if !validSync.HasConfig {
staticString = "unknown"
}
metrics.WorkingPodCount.WithLabelValues(state.String(), configState, staticString).Set(float64(count))
}
}
if len(counts) > 0 {
// in case a combination is lost
klog.V(3).InfoS("Programmer error, did not report a kubelet_working_pods metric for a value returned by SyncKnownPods", "counts", counts)
}
// Remove any cgroups in the hierarchy for pods that are definitely no longer
// running (not in the container runtime).
if kl.cgroupsPerQOS {
@ -1212,33 +1304,31 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
kl.cleanupOrphanedPodCgroups(pcm, cgroupPods, possiblyRunningPods)
}
// Cleanup any backoff entries.
kl.backOff.GC()
// If two pods with the same UID are observed in rapid succession, we need to
// resynchronize the pod worker after the first pod completes and decide whether
// to restart the pod. This happens last to avoid confusing the desired state
// in other components and to increase the likelihood transient OS failures during
// container start are mitigated. In general only static pods will ever reuse UIDs
// since the apiserver uses randomly generated UUIDv4 UIDs with a very low
// probability of collision.
for uid := range restartablePods {
pod, ok := allPodsByUID[uid]
if !ok {
continue
}
if kl.isAdmittedPodTerminal(pod) {
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
start := kl.clock.Now()
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
return nil
}
// splitPodsByStatic separates a list of desired pods from the pod manager into
// regular or static pods. Mirror pods are not valid config sources (a mirror pod
// being created cannot cause the Kubelet to start running a static pod) and are
// excluded.
func splitPodsByStatic(pods []*v1.Pod) (regular, static []*v1.Pod) {
regular, static = make([]*v1.Pod, 0, len(pods)), make([]*v1.Pod, 0, len(pods))
for _, pod := range pods {
if kubetypes.IsMirrorPod(pod) {
continue
}
if kubetypes.IsStaticPod(pod) {
static = append(static, pod)
} else {
regular = append(regular, pod)
}
}
return regular, static
}
// validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
// of the container. The previous flag will only return the logs for the last terminated container, otherwise, the current
// running container is preferred over a previous termination. If info about the container is not available then a specific

File diff suppressed because it is too large Load Diff

View File

@ -270,7 +270,7 @@ func newTestKubeletWithImageList(
kubelet.reasonCache = NewReasonCache()
kubelet.podCache = containertest.NewFakeCache(kubelet.containerRuntime)
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: kubelet.syncPod,
syncPodFn: kubelet.SyncPod,
cache: kubelet.podCache,
t: t,
}
@ -1350,7 +1350,7 @@ func TestCreateMirrorPod(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
pods := []*v1.Pod{pod}
kl.podManager.SetPods(pods)
isTerminal, err := kl.syncPod(context.Background(), tt.updateType, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kl.SyncPod(context.Background(), tt.updateType, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -1386,7 +1386,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
pods := []*v1.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods)
isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
isTerminal, err := kl.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -1548,7 +1548,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
})
kubelet.podManager.SetPods([]*v1.Pod{pod})
isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -1556,7 +1556,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
pod.Spec.HostNetwork = true
isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -2681,7 +2681,7 @@ func TestSyncTerminatingPodKillPod(t *testing.T) {
kl.podManager.SetPods(pods)
podStatus := &kubecontainer.PodStatus{ID: pod.UID}
gracePeriodOverride := int64(0)
err := kl.syncTerminatingPod(context.Background(), pod, podStatus, nil, &gracePeriodOverride, func(podStatus *v1.PodStatus) {
err := kl.SyncTerminatingPod(context.Background(), pod, podStatus, &gracePeriodOverride, func(podStatus *v1.PodStatus) {
podStatus.Phase = v1.PodFailed
podStatus.Reason = "reason"
podStatus.Message = "message"

View File

@ -428,7 +428,7 @@ func (m *kubeGenericRuntimeManager) GetPods(ctx context.Context, all bool) ([]*k
sort.SliceStable(result, func(i, j int) bool {
return result[i].CreatedAt > result[j].CreatedAt
})
klog.V(4).InfoS("Retrieved pods from runtime", "all", all)
return result, nil
}

View File

@ -55,6 +55,12 @@ const (
VolumeStatsHealthStatusAbnormalKey = "volume_stats_health_status_abnormal"
RunningPodsKey = "running_pods"
RunningContainersKey = "running_containers"
DesiredPodCountKey = "desired_pods"
ActivePodCountKey = "active_pods"
MirrorPodCountKey = "mirror_pods"
WorkingPodCountKey = "working_pods"
OrphanedRuntimePodTotalKey = "orphaned_runtime_pods_total"
RestartedPodTotalKey = "restarted_pods_total"
// Metrics keys of remote runtime operations
RuntimeOperationsKey = "runtime_operations_total"
@ -438,6 +444,64 @@ var (
},
[]string{"container_state"},
)
// DesiredPodCount tracks the count of pods the Kubelet thinks it should be running
DesiredPodCount = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: KubeletSubsystem,
Name: DesiredPodCountKey,
Help: "The number of pods the kubelet is being instructed to run. static is true if the pod is not from the apiserver.",
StabilityLevel: metrics.ALPHA,
},
[]string{"static"},
)
// ActivePodCount tracks the count of pods the Kubelet considers as active when deciding to admit a new pod
ActivePodCount = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: KubeletSubsystem,
Name: ActivePodCountKey,
Help: "The number of pods the kubelet considers active and which are being considered when admitting new pods. static is true if the pod is not from the apiserver.",
StabilityLevel: metrics.ALPHA,
},
[]string{"static"},
)
// MirrorPodCount tracks the number of mirror pods the Kubelet should have created for static pods
MirrorPodCount = metrics.NewGauge(
&metrics.GaugeOpts{
Subsystem: KubeletSubsystem,
Name: MirrorPodCountKey,
Help: "The number of mirror pods the kubelet will try to create (one per admitted static pod)",
StabilityLevel: metrics.ALPHA,
},
)
// WorkingPodCount tracks the count of pods in each lifecycle phase, whether they are static pods, and whether they are desired, orphaned, or runtime_only
WorkingPodCount = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: KubeletSubsystem,
Name: WorkingPodCountKey,
Help: "Number of pods the kubelet is actually running, broken down by lifecycle phase, whether the pod is desired, orphaned, or runtime only (also orphaned), and whether the pod is static. An orphaned pod has been removed from local configuration or force deleted in the API and consumes resources that are not otherwise visible.",
StabilityLevel: metrics.ALPHA,
},
[]string{"lifecycle", "config", "static"},
)
// OrphanedRuntimePodTotal is incremented every time a pod is detected in the runtime without being known to the pod worker first
OrphanedRuntimePodTotal = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: KubeletSubsystem,
Name: OrphanedRuntimePodTotalKey,
Help: "Number of pods that have been detected in the container runtime without being already known to the pod worker. This typically indicates the kubelet was restarted while a pod was force deleted in the API or in the local configuration, which is unusual.",
StabilityLevel: metrics.ALPHA,
},
)
// RestartedPodTotal is incremented every time a pod with the same UID is deleted and recreated
RestartedPodTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: KubeletSubsystem,
Name: RestartedPodTotalKey,
Help: "Number of pods that have been restarted because they were deleted and recreated with the same UID while the kubelet was watching them (common for static pods, extremely uncommon for API pods)",
StabilityLevel: metrics.ALPHA,
},
[]string{"static"},
)
// StartedPodsTotal is a counter that tracks pod sandbox creation operations
StartedPodsTotal = metrics.NewCounter(
&metrics.CounterOpts{
@ -615,6 +679,12 @@ func Register(collectors ...metrics.StableCollector) {
legacyregistry.MustRegister(DevicePluginAllocationDuration)
legacyregistry.MustRegister(RunningContainerCount)
legacyregistry.MustRegister(RunningPodCount)
legacyregistry.MustRegister(DesiredPodCount)
legacyregistry.MustRegister(ActivePodCount)
legacyregistry.MustRegister(MirrorPodCount)
legacyregistry.MustRegister(WorkingPodCount)
legacyregistry.MustRegister(OrphanedRuntimePodTotal)
legacyregistry.MustRegister(RestartedPodTotal)
legacyregistry.MustRegister(ManagedEphemeralContainers)
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
legacyregistry.MustRegister(PodResourcesEndpointRequestsTotalCount)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -133,7 +133,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
if isTerminal, err = kl.syncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
if isTerminal, err = kl.SyncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
}
if retry >= runOnceMaxRetries {

View File

@ -24,6 +24,8 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/onsi/gomega/gstruct"
"github.com/prometheus/common/model"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -133,6 +135,38 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
ginkgo.Context("and the container runtime is temporarily down during pod termination [NodeConformance] [Serial] [Disruptive]", func() {
ginkgo.It("the mirror pod should terminate successfully", func(ctx context.Context) {
ginkgo.By("verifying the pod is described as syncing in metrics")
gomega.Eventually(ctx, getKubeletMetrics, 5*time.Second, time.Second).Should(gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
"kubelet_working_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_working_pods{config="desired", lifecycle="sync", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="sync", static="true"}`: timelessSample(1),
`kubelet_working_pods{config="orphan", lifecycle="sync", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="sync", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="sync", static="unknown"}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminating", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminating", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminating", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminating", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="terminating", static="unknown"}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminated", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminated", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminated", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminated", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="terminated", static="unknown"}`: timelessSample(0),
}),
"kubelet_mirror_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_mirror_pods`: timelessSample(1),
}),
"kubelet_active_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_active_pods{static=""}`: timelessSample(0),
`kubelet_active_pods{static="true"}`: timelessSample(1),
}),
"kubelet_desired_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_desired_pods{static=""}`: timelessSample(0),
`kubelet_desired_pods{static="true"}`: timelessSample(1),
}),
}))
ginkgo.By("delete the static pod")
err := deleteStaticPod(podPath, staticPodName, ns)
framework.ExpectNoError(err)
@ -152,17 +186,104 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
return err
}, 2*time.Minute, time.Second*5).ShouldNot(gomega.Succeed())
ginkgo.By("verifying the mirror pod is running")
gomega.Consistently(ctx, func(ctx context.Context) error {
return checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
}, 19*time.Second, 200*time.Millisecond).Should(gomega.BeNil())
ginkgo.By("verifying the pod is described as terminating in metrics")
gomega.Eventually(ctx, getKubeletMetrics, 5*time.Second, time.Second).Should(gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
"kubelet_working_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_working_pods{config="desired", lifecycle="sync", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="sync", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="sync", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="sync", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="sync", static="unknown"}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminating", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminating", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminating", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminating", static="true"}`: timelessSample(1),
`kubelet_working_pods{config="runtime_only", lifecycle="terminating", static="unknown"}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminated", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminated", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminated", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminated", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="terminated", static="unknown"}`: timelessSample(0),
}),
"kubelet_mirror_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_mirror_pods`: timelessSample(1),
}),
"kubelet_active_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_active_pods{static=""}`: timelessSample(0),
// TODO: the pod is still running and consuming resources, it should be considered in
// admission https://github.com/kubernetes/kubernetes/issues/104824 for static pods at
// least, which means it should be 1
`kubelet_active_pods{static="true"}`: timelessSample(0),
}),
"kubelet_desired_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_desired_pods{static=""}`: timelessSample(0),
`kubelet_desired_pods{static="true"}`: timelessSample(0),
})}))
ginkgo.By("start the container runtime")
err = startContainerRuntime()
framework.ExpectNoError(err, "expected no error starting the container runtime")
gomega.Consistently(ctx, func(ctx context.Context) error {
ginkgo.By(fmt.Sprintf("verifying that the mirror pod (%s/%s) is running", ns, mirrorPodName))
err := checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
ginkgo.By("waiting for the container runtime to start")
gomega.Eventually(ctx, func(ctx context.Context) error {
r, _, err := getCRIClient()
if err != nil {
return fmt.Errorf("expected mirror pod (%s/%s) to be running but it was not: %v", ns, mirrorPodName, err)
return fmt.Errorf("error getting CRI client: %w", err)
}
status, err := r.Status(ctx, true)
if err != nil {
return fmt.Errorf("error checking CRI status: %w", err)
}
framework.Logf("Runtime started: %#v", status)
return nil
}, time.Second*30, time.Second*5).Should(gomega.Succeed())
}, 2*time.Minute, time.Second*5).Should(gomega.Succeed())
ginkgo.By(fmt.Sprintf("verifying that the mirror pod (%s/%s) stops running after about 30s", ns, mirrorPodName))
// from the time the container runtime starts, it should take a maximum of:
// 20s (grace period) + 2 sync transitions * 1s + 2s between housekeeping + 3s to detect CRI up +
// 2s overhead
// which we calculate here as "about 30s", so we try a bit longer than that but verify that it is
// tightly bounded by not waiting longer (we want to catch regressions to shutdown)
time.Sleep(30 * time.Second)
gomega.Eventually(ctx, func(ctx context.Context) error {
return checkMirrorPodDisappear(ctx, f.ClientSet, mirrorPodName, ns)
}, time.Second*3, time.Second).Should(gomega.Succeed())
ginkgo.By("verifying the pod finishes terminating and is removed from metrics")
gomega.Eventually(ctx, getKubeletMetrics, 15*time.Second, time.Second).Should(gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
"kubelet_working_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_working_pods{config="desired", lifecycle="sync", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="sync", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="sync", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="sync", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="sync", static="unknown"}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminating", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminating", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminating", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminating", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="terminating", static="unknown"}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminated", static=""}`: timelessSample(0),
`kubelet_working_pods{config="desired", lifecycle="terminated", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminated", static=""}`: timelessSample(0),
`kubelet_working_pods{config="orphan", lifecycle="terminated", static="true"}`: timelessSample(0),
`kubelet_working_pods{config="runtime_only", lifecycle="terminated", static="unknown"}`: timelessSample(0),
}),
"kubelet_mirror_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_mirror_pods`: timelessSample(0),
}),
"kubelet_active_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_active_pods{static=""}`: timelessSample(0),
`kubelet_active_pods{static="true"}`: timelessSample(0),
}),
"kubelet_desired_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
`kubelet_desired_pods{static=""}`: timelessSample(0),
`kubelet_desired_pods{static="true"}`: timelessSample(0),
}),
}))
})
ginkgo.AfterEach(func(ctx context.Context) {
@ -246,3 +367,8 @@ func checkMirrorPodRunningWithUID(ctx context.Context, cl clientset.Interface, n
}
return nil
}
func sampleLabelID(element interface{}) string {
el := element.(*model.Sample)
return el.Metric.String()
}

View File

@ -18,7 +18,6 @@ package e2enode
import (
"context"
goerrors "errors"
"fmt"
"os"
"path/filepath"
@ -411,7 +410,10 @@ func checkMirrorPodDisappear(ctx context.Context, cl clientset.Interface, name,
if apierrors.IsNotFound(err) {
return nil
}
return goerrors.New("pod not disappear")
if err == nil {
return fmt.Errorf("mirror pod %v/%v still exists", namespace, name)
}
return fmt.Errorf("expect mirror pod %v/%v to not exist but got error: %w", namespace, name, err)
}
func checkMirrorPodRunning(ctx context.Context, cl clientset.Interface, name, namespace string) error {