mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #113145 from smarterclayton/zombie_terminating_pods
kubelet: Force deleted pods can fail to move out of terminating
This commit is contained in:
commit
45b96eae98
@ -70,6 +70,7 @@ allowed_prometheus_importers=(
|
||||
./staging/src/k8s.io/component-base/metrics/value.go
|
||||
./staging/src/k8s.io/component-base/metrics/wrappers.go
|
||||
./test/e2e/apimachinery/flowcontrol.go
|
||||
./test/e2e_node/mirror_pod_grace_period_test.go
|
||||
./test/e2e/node/pods.go
|
||||
./test/e2e_node/resource_metrics_test.go
|
||||
./test/instrumentation/main_test.go
|
||||
|
@ -23,11 +23,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO(yifan): Maybe set the them as parameters for NewCache().
|
||||
defaultCachePeriod = time.Second * 2
|
||||
)
|
||||
|
||||
// RuntimeCache is in interface for obtaining cached Pods.
|
||||
type RuntimeCache interface {
|
||||
GetPods(context.Context) ([]*Pod, error)
|
||||
@ -39,9 +34,10 @@ type podsGetter interface {
|
||||
}
|
||||
|
||||
// NewRuntimeCache creates a container runtime cache.
|
||||
func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
||||
func NewRuntimeCache(getter podsGetter, cachePeriod time.Duration) (RuntimeCache, error) {
|
||||
return &runtimeCache{
|
||||
getter: getter,
|
||||
getter: getter,
|
||||
cachePeriod: cachePeriod,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -53,6 +49,8 @@ type runtimeCache struct {
|
||||
sync.Mutex
|
||||
// The underlying container runtime used to update the cache.
|
||||
getter podsGetter
|
||||
// The interval after which the cache should be refreshed.
|
||||
cachePeriod time.Duration
|
||||
// Last time when cache was updated.
|
||||
cacheTime time.Time
|
||||
// The content of the cache.
|
||||
@ -64,7 +62,7 @@ type runtimeCache struct {
|
||||
func (r *runtimeCache) GetPods(ctx context.Context) ([]*Pod, error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
if time.Since(r.cacheTime) > defaultCachePeriod {
|
||||
if time.Since(r.cacheTime) > r.cachePeriod {
|
||||
if err := r.updateCache(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -148,7 +148,12 @@ const (
|
||||
// Duration at which housekeeping failed to satisfy the invariant that
|
||||
// housekeeping should be fast to avoid blocking pod config (while
|
||||
// housekeeping is running no new pods are started or deleted).
|
||||
housekeepingWarningDuration = time.Second * 15
|
||||
housekeepingWarningDuration = time.Second * 1
|
||||
|
||||
// Period after which the runtime cache expires - set to slightly longer than
|
||||
// the expected length between housekeeping periods, which explicitly refreshes
|
||||
// the cache.
|
||||
runtimeCacheRefreshPeriod = housekeepingPeriod + housekeepingWarningDuration
|
||||
|
||||
// Period for performing eviction monitoring.
|
||||
// ensure this is kept in sync with internal cadvisor housekeeping.
|
||||
@ -636,10 +641,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.reasonCache = NewReasonCache()
|
||||
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
|
||||
klet.podWorkers = newPodWorkers(
|
||||
klet.syncPod,
|
||||
klet.syncTerminatingPod,
|
||||
klet.syncTerminatedPod,
|
||||
|
||||
klet,
|
||||
kubeDeps.Recorder,
|
||||
klet.workQueue,
|
||||
klet.resyncInterval,
|
||||
@ -685,7 +687,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
klet.streamingRuntime = runtime
|
||||
klet.runner = runtime
|
||||
|
||||
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
|
||||
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime, runtimeCacheRefreshPeriod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1562,17 +1564,18 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
kl.syncLoop(ctx, updates, kl)
|
||||
}
|
||||
|
||||
// syncPod is the transaction script for the sync of a single pod (setting up)
|
||||
// SyncPod is the transaction script for the sync of a single pod (setting up)
|
||||
// a pod. This method is reentrant and expected to converge a pod towards the
|
||||
// desired state of the spec. The reverse (teardown) is handled in
|
||||
// syncTerminatingPod and syncTerminatedPod. If syncPod exits without error,
|
||||
// SyncTerminatingPod and SyncTerminatedPod. If SyncPod exits without error,
|
||||
// then the pod runtime state is in sync with the desired configuration state
|
||||
// (pod is running). If syncPod exits with a transient error, the next
|
||||
// invocation of syncPod is expected to make progress towards reaching the
|
||||
// runtime state. syncPod exits with isTerminal when the pod was detected to
|
||||
// (pod is running). If SyncPod exits with a transient error, the next
|
||||
// invocation of SyncPod is expected to make progress towards reaching the
|
||||
// desired state. SyncPod exits with isTerminal when the pod was detected to
|
||||
// have reached a terminal lifecycle phase due to container exits (for
|
||||
// RestartNever or RestartOnFailure) and the next method invoked will by
|
||||
// syncTerminatingPod.
|
||||
// RestartNever or RestartOnFailure) and the next method invoked will be
|
||||
// SyncTerminatingPod. If the pod terminates for any other reason, SyncPod
|
||||
// will receive a context cancellation and should exit as soon as possible.
|
||||
//
|
||||
// Arguments:
|
||||
//
|
||||
@ -1585,7 +1588,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
//
|
||||
// podStatus - the most recent pod status observed for this pod which can
|
||||
// be used to determine the set of actions that should be taken during
|
||||
// this loop of syncPod
|
||||
// this loop of SyncPod
|
||||
//
|
||||
// The workflow is:
|
||||
// - If the pod is being created, record pod worker start latency
|
||||
@ -1605,18 +1608,18 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
||||
// - Update the traffic shaping for the pod's ingress and egress limits
|
||||
//
|
||||
// If any step of this workflow errors, the error is returned, and is repeated
|
||||
// on the next syncPod call.
|
||||
// on the next SyncPod call.
|
||||
//
|
||||
// This operation writes all events that are dispatched in order to provide
|
||||
// the most accurate information possible about an error situation to aid debugging.
|
||||
// Callers should not write an event if this operation returns an error.
|
||||
func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
|
||||
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
|
||||
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
|
||||
// Currently, using that context causes test failures.
|
||||
ctx := context.TODO()
|
||||
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer func() {
|
||||
klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
|
||||
klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
|
||||
}()
|
||||
|
||||
// Latency measurements for the main workflow are relative to the
|
||||
@ -1871,35 +1874,21 @@ func (kl *Kubelet) syncPod(_ context.Context, updateType kubetypes.SyncPodType,
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
|
||||
// returns without error, the pod's local state can be safely cleaned up. If runningPod is passed,
|
||||
// we perform no status updates.
|
||||
func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
|
||||
// SyncTerminatingPod is expected to terminate all running containers in a pod. Once this method
|
||||
// returns without error, the pod is considered to be terminated and it will be safe to clean up any
|
||||
// pod state that is tied to the lifetime of running containers. The next method invoked will be
|
||||
// SyncTerminatedPod. This method is expected to return with the grace period provided and the
|
||||
// provided context may be cancelled if the duration is exceeded. The method may also be interrupted
|
||||
// with a context cancellation if the grace period is shortened by the user or the kubelet (such as
|
||||
// during eviction). This method is not guaranteed to be called if a pod is force deleted from the
|
||||
// configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned
|
||||
// pods.
|
||||
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
|
||||
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
|
||||
// Currently, using that context causes test failures.
|
||||
ctx := context.Background()
|
||||
klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// when we receive a runtime only pod (runningPod != nil) we don't need to update the status
|
||||
// manager or refresh the status of the cache, because a successful killPod will ensure we do
|
||||
// not get invoked again
|
||||
if runningPod != nil {
|
||||
// we kill the pod with the specified grace period since this is a termination
|
||||
if gracePeriod != nil {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
|
||||
} else {
|
||||
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
|
||||
}
|
||||
if err := kl.killPod(ctx, pod, *runningPod, gracePeriod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
return nil
|
||||
}
|
||||
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
|
||||
if podStatusFn != nil {
|
||||
@ -1980,13 +1969,47 @@ func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncTerminatedPod cleans up a pod that has terminated (has no running containers).
|
||||
// SyncTerminatingRuntimePod is expected to terminate running containers in a pod that we have no
|
||||
// configuration for. Once this method returns without error, any remaining local state can be safely
|
||||
// cleaned up by background processes in each subsystem. Unlike syncTerminatingPod, we lack
|
||||
// knowledge of the full pod spec and so cannot perform lifecycle related operations, only ensure
|
||||
// that the remnant of the running pod is terminated and allow garbage collection to proceed. We do
|
||||
// not update the status of the pod because with the source of configuration removed, we have no
|
||||
// place to send that status.
|
||||
func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error {
|
||||
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
|
||||
// Currently, using that context causes test failures.
|
||||
ctx := context.Background()
|
||||
pod := runningPod.ToAPIPod()
|
||||
klog.V(4).InfoS("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// we kill the pod directly since we have lost all other information about the pod.
|
||||
klog.V(4).InfoS("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
// TODO: this should probably be zero, to bypass any waiting (needs fixes in container runtime)
|
||||
gracePeriod := int64(1)
|
||||
if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil {
|
||||
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
|
||||
// there was an error killing the pod, so we return that error directly
|
||||
utilruntime.HandleError(err)
|
||||
return err
|
||||
}
|
||||
klog.V(4).InfoS("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SyncTerminatedPod cleans up a pod that has terminated (has no running containers).
|
||||
// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which
|
||||
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup.
|
||||
// TODO: make this method take a context and exit early
|
||||
func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
||||
klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup. This method
|
||||
// reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios.
|
||||
//
|
||||
// Because the kubelet has no local store of information, all actions in this method that modify
|
||||
// on-disk state must be reentrant and be garbage collected by HandlePodCleanups or a separate loop.
|
||||
// This typically occurs when a pod is force deleted from configuration (local disk or API) and the
|
||||
// kubelet restarts in the middle of the action.
|
||||
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
|
||||
klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
|
||||
// generate the final status of the pod
|
||||
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
|
||||
@ -2323,9 +2346,9 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
|
||||
}
|
||||
duration := time.Since(start)
|
||||
if duration > housekeepingWarningDuration {
|
||||
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
|
||||
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than expected", "expected", housekeepingWarningDuration, "actual", duration.Round(time.Millisecond))
|
||||
}
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping) end")
|
||||
klog.V(4).InfoS("SyncLoop (housekeeping) end", "duration", duration.Round(time.Millisecond))
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
@ -54,6 +54,7 @@ import (
|
||||
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/cri/streaming/remotecommand"
|
||||
"k8s.io/kubernetes/pkg/kubelet/envvars"
|
||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
@ -1057,7 +1058,7 @@ func (kl *Kubelet) deleteOrphanedMirrorPods() {
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
|
||||
} else {
|
||||
klog.V(3).InfoS("Deleted pod", "podName", podFullname)
|
||||
klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1066,9 +1067,16 @@ func (kl *Kubelet) deleteOrphanedMirrorPods() {
|
||||
// HandlePodCleanups performs a series of cleanup work, including terminating
|
||||
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
|
||||
// directories. No config changes are sent to pod workers while this method
|
||||
// is executing which means no new pods can appear.
|
||||
// NOTE: This function is executed by the main sync loop, so it
|
||||
// should not contain any blocking calls.
|
||||
// is executing which means no new pods can appear. After this method completes
|
||||
// the desired state of the kubelet should be reconciled with the actual state
|
||||
// in the pod worker and other pod-related components.
|
||||
//
|
||||
// This function is executed by the main sync loop, so it must execute quickly
|
||||
// and all nested calls should be asynchronous. Any slow reconciliation actions
|
||||
// should be performed by other components (like the volume manager). The duration
|
||||
// of this call is the minimum latency for static pods to be restarted if they
|
||||
// are updated with a fixed UID (most should use a dynamic UID), and no config
|
||||
// updates are delivered to the pod workers while this method is running.
|
||||
func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
// The kubelet lacks checkpointing, so we need to introspect the set of pods
|
||||
// in the cgroup tree prior to inspecting the set of pods in our pod manager.
|
||||
@ -1087,6 +1095,15 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
}
|
||||
|
||||
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
|
||||
activePods := kl.filterOutInactivePods(allPods)
|
||||
allRegularPods, allStaticPods := splitPodsByStatic(allPods)
|
||||
activeRegularPods, activeStaticPods := splitPodsByStatic(activePods)
|
||||
metrics.DesiredPodCount.WithLabelValues("").Set(float64(len(allRegularPods)))
|
||||
metrics.DesiredPodCount.WithLabelValues("true").Set(float64(len(allStaticPods)))
|
||||
metrics.ActivePodCount.WithLabelValues("").Set(float64(len(activeRegularPods)))
|
||||
metrics.ActivePodCount.WithLabelValues("true").Set(float64(len(activeStaticPods)))
|
||||
metrics.MirrorPodCount.Set(float64(len(mirrorPods)))
|
||||
|
||||
// Pod phase progresses monotonically. Once a pod has reached a final state,
|
||||
// it should never leave regardless of the restart policy. The statuses
|
||||
// of such pods should not be changed, and there is no need to sync them.
|
||||
@ -1102,6 +1119,10 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
klog.V(3).InfoS("Clean up pod workers for terminated pods")
|
||||
workingPods := kl.podWorkers.SyncKnownPods(allPods)
|
||||
|
||||
// Reconcile: At this point the pod workers have been pruned to the set of
|
||||
// desired pods. Pods that must be restarted due to UID reuse, or leftover
|
||||
// pods from previous runs, are not known to the pod worker.
|
||||
|
||||
allPodsByUID := make(map[types.UID]*v1.Pod)
|
||||
for _, pod := range allPods {
|
||||
allPodsByUID[pod.UID] = pod
|
||||
@ -1112,70 +1133,45 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
// that have already been removed from config. Pods that are terminating
|
||||
// will be added to possiblyRunningPods, to prevent overly aggressive
|
||||
// cleanup of pod cgroups.
|
||||
stringIfTrue := func(t bool) string {
|
||||
if t {
|
||||
return "true"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
runningPods := make(map[types.UID]sets.Empty)
|
||||
possiblyRunningPods := make(map[types.UID]sets.Empty)
|
||||
restartablePods := make(map[types.UID]sets.Empty)
|
||||
for uid, sync := range workingPods {
|
||||
switch sync {
|
||||
switch sync.State {
|
||||
case SyncPod:
|
||||
runningPods[uid] = struct{}{}
|
||||
possiblyRunningPods[uid] = struct{}{}
|
||||
case TerminatingPod:
|
||||
possiblyRunningPods[uid] = struct{}{}
|
||||
case TerminatedAndRecreatedPod:
|
||||
restartablePods[uid] = struct{}{}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve the list of running containers from the runtime to perform cleanup.
|
||||
// We need the latest state to avoid delaying restarts of static pods that reuse
|
||||
// a UID.
|
||||
if err := kl.runtimeCache.ForceUpdateIfOlder(ctx, kl.clock.Now()); err != nil {
|
||||
klog.ErrorS(err, "Error listing containers")
|
||||
return err
|
||||
}
|
||||
runningRuntimePods, err := kl.runtimeCache.GetPods(ctx)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error listing containers")
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop probing pods that are not running
|
||||
klog.V(3).InfoS("Clean up probes for terminated pods")
|
||||
kl.probeManager.CleanupPods(possiblyRunningPods)
|
||||
|
||||
// Terminate any pods that are observed in the runtime but not
|
||||
// present in the list of known running pods from config.
|
||||
runningRuntimePods, err := kl.runtimeCache.GetPods(ctx)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error listing containers")
|
||||
return err
|
||||
}
|
||||
for _, runningPod := range runningRuntimePods {
|
||||
switch workerState, ok := workingPods[runningPod.ID]; {
|
||||
case ok && workerState == SyncPod, ok && workerState == TerminatingPod:
|
||||
// if the pod worker is already in charge of this pod, we don't need to do anything
|
||||
continue
|
||||
default:
|
||||
// If the pod isn't in the set that should be running and isn't already terminating, terminate
|
||||
// now. This termination is aggressive because all known pods should already be in a known state
|
||||
// (i.e. a removed static pod should already be terminating), so these are pods that were
|
||||
// orphaned due to kubelet restart or bugs. Since housekeeping blocks other config changes, we
|
||||
// know that another pod wasn't started in the background so we are safe to terminate the
|
||||
// unknown pods.
|
||||
if _, ok := allPodsByUID[runningPod.ID]; !ok {
|
||||
klog.V(3).InfoS("Clean up orphaned pod containers", "podUID", runningPod.ID)
|
||||
one := int64(1)
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
UpdateType: kubetypes.SyncPodKill,
|
||||
RunningPod: runningPod,
|
||||
KillPodOptions: &KillPodOptions{
|
||||
PodTerminationGracePeriodSecondsOverride: &one,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove orphaned pod statuses not in the total list of known config pods
|
||||
klog.V(3).InfoS("Clean up orphaned pod statuses")
|
||||
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
|
||||
// Note that we just killed the unwanted pods. This may not have reflected
|
||||
// in the cache. We need to bypass the cache to get the latest set of
|
||||
// running pods to clean up the volumes.
|
||||
// TODO: Evaluate the performance impact of bypassing the runtime cache.
|
||||
runningRuntimePods, err = kl.containerRuntime.GetPods(ctx, false)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error listing containers")
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove orphaned pod user namespace allocations (if any).
|
||||
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
|
||||
@ -1204,6 +1200,102 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
klog.V(3).InfoS("Clean up orphaned mirror pods")
|
||||
kl.deleteOrphanedMirrorPods()
|
||||
|
||||
// At this point, the pod worker is aware of which pods are not desired (SyncKnownPods).
|
||||
// We now look through the set of active pods for those that the pod worker is not aware of
|
||||
// and deliver an update. The most common reason a pod is not known is because the pod was
|
||||
// deleted and recreated with the same UID while the pod worker was driving its lifecycle (very
|
||||
// very rare for API pods, common for static pods with fixed UIDs). Containers that may still
|
||||
// be running from a previous execution must be reconciled by the pod worker's sync method.
|
||||
// We must use active pods because that is the set of admitted pods (podManager includes pods
|
||||
// that will never be run, and statusManager tracks already rejected pods).
|
||||
var restartCount, restartCountStatic int
|
||||
for _, desiredPod := range activePods {
|
||||
if _, knownPod := workingPods[desiredPod.UID]; knownPod {
|
||||
continue
|
||||
}
|
||||
|
||||
klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID)
|
||||
isStatic := kubetypes.IsStaticPod(desiredPod)
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod)
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
UpdateType: kubetypes.SyncPodCreate,
|
||||
Pod: desiredPod,
|
||||
MirrorPod: mirrorPod,
|
||||
})
|
||||
|
||||
// the desired pod is now known as well
|
||||
workingPods[desiredPod.UID] = PodWorkerSync{State: SyncPod, HasConfig: true, Static: isStatic}
|
||||
if isStatic {
|
||||
// restartable static pods are the normal case
|
||||
restartCountStatic++
|
||||
} else {
|
||||
// almost certainly means shenanigans, as API pods should never have the same UID after being deleted and recreated
|
||||
// unless there is a major API violation
|
||||
restartCount++
|
||||
}
|
||||
}
|
||||
metrics.RestartedPodTotal.WithLabelValues("true").Add(float64(restartCountStatic))
|
||||
metrics.RestartedPodTotal.WithLabelValues("").Add(float64(restartCount))
|
||||
|
||||
// Finally, terminate any pods that are observed in the runtime but not present in the list of
|
||||
// known running pods from config. If we do terminate running runtime pods that will happen
|
||||
// asynchronously in the background and those will be processed in the next invocation of
|
||||
// HandlePodCleanups.
|
||||
var orphanCount int
|
||||
for _, runningPod := range runningRuntimePods {
|
||||
// If there are orphaned pod resources in CRI that are unknown to the pod worker, terminate them
|
||||
// now. Since housekeeping is exclusive to other pod worker updates, we know that no pods have
|
||||
// been added to the pod worker in the meantime. Note that pods that are not visible in the runtime
|
||||
// but which were previously known are terminated by SyncKnownPods().
|
||||
_, knownPod := workingPods[runningPod.ID]
|
||||
if !knownPod {
|
||||
one := int64(1)
|
||||
killPodOptions := &KillPodOptions{
|
||||
PodTerminationGracePeriodSecondsOverride: &one,
|
||||
}
|
||||
klog.V(2).InfoS("Clean up containers for orphaned pod we had not seen before", "podUID", runningPod.ID, "killPodOptions", killPodOptions)
|
||||
kl.podWorkers.UpdatePod(UpdatePodOptions{
|
||||
UpdateType: kubetypes.SyncPodKill,
|
||||
RunningPod: runningPod,
|
||||
KillPodOptions: killPodOptions,
|
||||
})
|
||||
|
||||
// the running pod is now known as well
|
||||
workingPods[runningPod.ID] = PodWorkerSync{State: TerminatingPod, Orphan: true}
|
||||
orphanCount++
|
||||
}
|
||||
}
|
||||
metrics.OrphanedRuntimePodTotal.Add(float64(orphanCount))
|
||||
|
||||
// Now that we have recorded any terminating pods, and added new pods that should be running,
|
||||
// record a summary here. Not all possible combinations of PodWorkerSync values are valid.
|
||||
counts := make(map[PodWorkerSync]int)
|
||||
for _, sync := range workingPods {
|
||||
counts[sync]++
|
||||
}
|
||||
for validSync, configState := range map[PodWorkerSync]string{
|
||||
{HasConfig: true, Static: true}: "desired",
|
||||
{HasConfig: true, Static: false}: "desired",
|
||||
{Orphan: true, HasConfig: true, Static: true}: "orphan",
|
||||
{Orphan: true, HasConfig: true, Static: false}: "orphan",
|
||||
{Orphan: true, HasConfig: false}: "runtime_only",
|
||||
} {
|
||||
for _, state := range []PodWorkerState{SyncPod, TerminatingPod, TerminatedPod} {
|
||||
validSync.State = state
|
||||
count := counts[validSync]
|
||||
delete(counts, validSync)
|
||||
staticString := stringIfTrue(validSync.Static)
|
||||
if !validSync.HasConfig {
|
||||
staticString = "unknown"
|
||||
}
|
||||
metrics.WorkingPodCount.WithLabelValues(state.String(), configState, staticString).Set(float64(count))
|
||||
}
|
||||
}
|
||||
if len(counts) > 0 {
|
||||
// in case a combination is lost
|
||||
klog.V(3).InfoS("Programmer error, did not report a kubelet_working_pods metric for a value returned by SyncKnownPods", "counts", counts)
|
||||
}
|
||||
|
||||
// Remove any cgroups in the hierarchy for pods that are definitely no longer
|
||||
// running (not in the container runtime).
|
||||
if kl.cgroupsPerQOS {
|
||||
@ -1212,33 +1304,31 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
|
||||
kl.cleanupOrphanedPodCgroups(pcm, cgroupPods, possiblyRunningPods)
|
||||
}
|
||||
|
||||
// Cleanup any backoff entries.
|
||||
kl.backOff.GC()
|
||||
|
||||
// If two pods with the same UID are observed in rapid succession, we need to
|
||||
// resynchronize the pod worker after the first pod completes and decide whether
|
||||
// to restart the pod. This happens last to avoid confusing the desired state
|
||||
// in other components and to increase the likelihood transient OS failures during
|
||||
// container start are mitigated. In general only static pods will ever reuse UIDs
|
||||
// since the apiserver uses randomly generated UUIDv4 UIDs with a very low
|
||||
// probability of collision.
|
||||
for uid := range restartablePods {
|
||||
pod, ok := allPodsByUID[uid]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if kl.isAdmittedPodTerminal(pod) {
|
||||
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
continue
|
||||
}
|
||||
start := kl.clock.Now()
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// splitPodsByStatic separates a list of desired pods from the pod manager into
|
||||
// regular or static pods. Mirror pods are not valid config sources (a mirror pod
|
||||
// being created cannot cause the Kubelet to start running a static pod) and are
|
||||
// excluded.
|
||||
func splitPodsByStatic(pods []*v1.Pod) (regular, static []*v1.Pod) {
|
||||
regular, static = make([]*v1.Pod, 0, len(pods)), make([]*v1.Pod, 0, len(pods))
|
||||
for _, pod := range pods {
|
||||
if kubetypes.IsMirrorPod(pod) {
|
||||
continue
|
||||
}
|
||||
if kubetypes.IsStaticPod(pod) {
|
||||
static = append(static, pod)
|
||||
} else {
|
||||
regular = append(regular, pod)
|
||||
}
|
||||
}
|
||||
return regular, static
|
||||
}
|
||||
|
||||
// validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
|
||||
// of the container. The previous flag will only return the logs for the last terminated container, otherwise, the current
|
||||
// running container is preferred over a previous termination. If info about the container is not available then a specific
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -269,7 +269,7 @@ func newTestKubeletWithImageList(
|
||||
kubelet.reasonCache = NewReasonCache()
|
||||
kubelet.podCache = containertest.NewFakeCache(kubelet.containerRuntime)
|
||||
kubelet.podWorkers = &fakePodWorkers{
|
||||
syncPodFn: kubelet.syncPod,
|
||||
syncPodFn: kubelet.SyncPod,
|
||||
cache: kubelet.podCache,
|
||||
t: t,
|
||||
}
|
||||
@ -1348,7 +1348,7 @@ func TestCreateMirrorPod(t *testing.T) {
|
||||
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
|
||||
pods := []*v1.Pod{pod}
|
||||
kl.podManager.SetPods(pods)
|
||||
isTerminal, err := kl.syncPod(context.Background(), tt.updateType, pod, nil, &kubecontainer.PodStatus{})
|
||||
isTerminal, err := kl.SyncPod(context.Background(), tt.updateType, pod, nil, &kubecontainer.PodStatus{})
|
||||
assert.NoError(t, err)
|
||||
if isTerminal {
|
||||
t.Fatalf("pod should not be terminal: %#v", pod)
|
||||
@ -1384,7 +1384,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
|
||||
|
||||
pods := []*v1.Pod{pod, mirrorPod}
|
||||
kl.podManager.SetPods(pods)
|
||||
isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
|
||||
isTerminal, err := kl.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
|
||||
assert.NoError(t, err)
|
||||
if isTerminal {
|
||||
t.Fatalf("pod should not be terminal: %#v", pod)
|
||||
@ -1546,7 +1546,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
|
||||
})
|
||||
|
||||
kubelet.podManager.SetPods([]*v1.Pod{pod})
|
||||
isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
|
||||
isTerminal, err := kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
|
||||
assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error")
|
||||
if isTerminal {
|
||||
t.Fatalf("pod should not be terminal: %#v", pod)
|
||||
@ -1554,7 +1554,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
|
||||
|
||||
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
|
||||
pod.Spec.HostNetwork = true
|
||||
isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
|
||||
isTerminal, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
|
||||
assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error")
|
||||
if isTerminal {
|
||||
t.Fatalf("pod should not be terminal: %#v", pod)
|
||||
@ -2679,7 +2679,7 @@ func TestSyncTerminatingPodKillPod(t *testing.T) {
|
||||
kl.podManager.SetPods(pods)
|
||||
podStatus := &kubecontainer.PodStatus{ID: pod.UID}
|
||||
gracePeriodOverride := int64(0)
|
||||
err := kl.syncTerminatingPod(context.Background(), pod, podStatus, nil, &gracePeriodOverride, func(podStatus *v1.PodStatus) {
|
||||
err := kl.SyncTerminatingPod(context.Background(), pod, podStatus, &gracePeriodOverride, func(podStatus *v1.PodStatus) {
|
||||
podStatus.Phase = v1.PodFailed
|
||||
podStatus.Reason = "reason"
|
||||
podStatus.Message = "message"
|
||||
|
@ -428,7 +428,7 @@ func (m *kubeGenericRuntimeManager) GetPods(ctx context.Context, all bool) ([]*k
|
||||
sort.SliceStable(result, func(i, j int) bool {
|
||||
return result[i].CreatedAt > result[j].CreatedAt
|
||||
})
|
||||
|
||||
klog.V(4).InfoS("Retrieved pods from runtime", "all", all)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,12 @@ const (
|
||||
VolumeStatsHealthStatusAbnormalKey = "volume_stats_health_status_abnormal"
|
||||
RunningPodsKey = "running_pods"
|
||||
RunningContainersKey = "running_containers"
|
||||
DesiredPodCountKey = "desired_pods"
|
||||
ActivePodCountKey = "active_pods"
|
||||
MirrorPodCountKey = "mirror_pods"
|
||||
WorkingPodCountKey = "working_pods"
|
||||
OrphanedRuntimePodTotalKey = "orphaned_runtime_pods_total"
|
||||
RestartedPodTotalKey = "restarted_pods_total"
|
||||
|
||||
// Metrics keys of remote runtime operations
|
||||
RuntimeOperationsKey = "runtime_operations_total"
|
||||
@ -438,6 +444,64 @@ var (
|
||||
},
|
||||
[]string{"container_state"},
|
||||
)
|
||||
// DesiredPodCount tracks the count of pods the Kubelet thinks it should be running
|
||||
DesiredPodCount = metrics.NewGaugeVec(
|
||||
&metrics.GaugeOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: DesiredPodCountKey,
|
||||
Help: "The number of pods the kubelet is being instructed to run. static is true if the pod is not from the apiserver.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"static"},
|
||||
)
|
||||
// ActivePodCount tracks the count of pods the Kubelet considers as active when deciding to admit a new pod
|
||||
ActivePodCount = metrics.NewGaugeVec(
|
||||
&metrics.GaugeOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: ActivePodCountKey,
|
||||
Help: "The number of pods the kubelet considers active and which are being considered when admitting new pods. static is true if the pod is not from the apiserver.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"static"},
|
||||
)
|
||||
// MirrorPodCount tracks the number of mirror pods the Kubelet should have created for static pods
|
||||
MirrorPodCount = metrics.NewGauge(
|
||||
&metrics.GaugeOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: MirrorPodCountKey,
|
||||
Help: "The number of mirror pods the kubelet will try to create (one per admitted static pod)",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
)
|
||||
// WorkingPodCount tracks the count of pods in each lifecycle phase, whether they are static pods, and whether they are desired, orphaned, or runtime_only
|
||||
WorkingPodCount = metrics.NewGaugeVec(
|
||||
&metrics.GaugeOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: WorkingPodCountKey,
|
||||
Help: "Number of pods the kubelet is actually running, broken down by lifecycle phase, whether the pod is desired, orphaned, or runtime only (also orphaned), and whether the pod is static. An orphaned pod has been removed from local configuration or force deleted in the API and consumes resources that are not otherwise visible.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"lifecycle", "config", "static"},
|
||||
)
|
||||
// OrphanedRuntimePodTotal is incremented every time a pod is detected in the runtime without being known to the pod worker first
|
||||
OrphanedRuntimePodTotal = metrics.NewCounter(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: OrphanedRuntimePodTotalKey,
|
||||
Help: "Number of pods that have been detected in the container runtime without being already known to the pod worker. This typically indicates the kubelet was restarted while a pod was force deleted in the API or in the local configuration, which is unusual.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
)
|
||||
// RestartedPodTotal is incremented every time a pod with the same UID is deleted and recreated
|
||||
RestartedPodTotal = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: KubeletSubsystem,
|
||||
Name: RestartedPodTotalKey,
|
||||
Help: "Number of pods that have been restarted because they were deleted and recreated with the same UID while the kubelet was watching them (common for static pods, extremely uncommon for API pods)",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"static"},
|
||||
)
|
||||
// StartedPodsTotal is a counter that tracks pod sandbox creation operations
|
||||
StartedPodsTotal = metrics.NewCounter(
|
||||
&metrics.CounterOpts{
|
||||
@ -615,6 +679,12 @@ func Register(collectors ...metrics.StableCollector) {
|
||||
legacyregistry.MustRegister(DevicePluginAllocationDuration)
|
||||
legacyregistry.MustRegister(RunningContainerCount)
|
||||
legacyregistry.MustRegister(RunningPodCount)
|
||||
legacyregistry.MustRegister(DesiredPodCount)
|
||||
legacyregistry.MustRegister(ActivePodCount)
|
||||
legacyregistry.MustRegister(MirrorPodCount)
|
||||
legacyregistry.MustRegister(WorkingPodCount)
|
||||
legacyregistry.MustRegister(OrphanedRuntimePodTotal)
|
||||
legacyregistry.MustRegister(RestartedPodTotal)
|
||||
legacyregistry.MustRegister(ManagedEphemeralContainers)
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
|
||||
legacyregistry.MustRegister(PodResourcesEndpointRequestsTotalCount)
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -133,7 +133,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura
|
||||
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
|
||||
}
|
||||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
if isTerminal, err = kl.syncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
|
||||
if isTerminal, err = kl.SyncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
|
||||
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
if retry >= runOnceMaxRetries {
|
||||
|
@ -121,6 +121,15 @@ func LookForStringInLog(ns, podName, container, expectedString string, timeout t
|
||||
})
|
||||
}
|
||||
|
||||
// LookForStringInLogWithoutKubectl looks for the given string in the log of a specific pod container
|
||||
func LookForStringInLogWithoutKubectl(ctx context.Context, client clientset.Interface, ns string, podName string, container string, expectedString string, timeout time.Duration) (result string, err error) {
|
||||
return lookForString(expectedString, timeout, func() string {
|
||||
podLogs, err := e2epod.GetPodLogs(ctx, client, ns, podName, container)
|
||||
framework.ExpectNoError(err)
|
||||
return podLogs
|
||||
})
|
||||
}
|
||||
|
||||
// CreateEmptyFileOnPod creates empty file at given path on the pod.
|
||||
func CreateEmptyFileOnPod(namespace string, podName string, filePath string) error {
|
||||
_, err := e2ekubectl.RunKubectl(namespace, "exec", podName, "--", "/bin/sh", "-c", fmt.Sprintf("touch %s", filePath))
|
||||
|
@ -149,6 +149,10 @@ type Test struct {
|
||||
|
||||
// NewNFSServer is a NFS-specific wrapper for CreateStorageServer.
|
||||
func NewNFSServer(ctx context.Context, cs clientset.Interface, namespace string, args []string) (config TestConfig, pod *v1.Pod, host string) {
|
||||
return NewNFSServerWithNodeName(ctx, cs, namespace, args, "")
|
||||
}
|
||||
|
||||
func NewNFSServerWithNodeName(ctx context.Context, cs clientset.Interface, namespace string, args []string, nodeName string) (config TestConfig, pod *v1.Pod, host string) {
|
||||
config = TestConfig{
|
||||
Namespace: namespace,
|
||||
Prefix: "nfs",
|
||||
@ -157,6 +161,10 @@ func NewNFSServer(ctx context.Context, cs clientset.Interface, namespace string,
|
||||
ServerVolumes: map[string]string{"": "/exports"},
|
||||
ServerReadyMessage: "NFS started",
|
||||
}
|
||||
if nodeName != "" {
|
||||
config.ClientNodeSelection = e2epod.NodeSelection{Name: nodeName}
|
||||
}
|
||||
|
||||
if len(args) > 0 {
|
||||
config.ServerArgs = args
|
||||
}
|
||||
@ -329,6 +337,10 @@ func startVolumeServer(ctx context.Context, client clientset.Interface, config T
|
||||
},
|
||||
}
|
||||
|
||||
if config.ClientNodeSelection.Name != "" {
|
||||
serverPod.Spec.NodeName = config.ClientNodeSelection.Name
|
||||
}
|
||||
|
||||
var pod *v1.Pod
|
||||
serverPod, err := podClient.Create(ctx, serverPod, metav1.CreateOptions{})
|
||||
// ok if the server pod already exists. TODO: make this controllable by callers
|
||||
@ -355,7 +367,7 @@ func startVolumeServer(ctx context.Context, client clientset.Interface, config T
|
||||
}
|
||||
}
|
||||
if config.ServerReadyMessage != "" {
|
||||
_, err := e2epodoutput.LookForStringInLog(pod.Namespace, pod.Name, serverPodName, config.ServerReadyMessage, VolumeServerPodStartupTimeout)
|
||||
_, err := e2epodoutput.LookForStringInLogWithoutKubectl(ctx, client, pod.Namespace, pod.Name, serverPodName, config.ServerReadyMessage, VolumeServerPodStartupTimeout)
|
||||
framework.ExpectNoError(err, "Failed to find %q in pod logs: %s", config.ServerReadyMessage, err)
|
||||
}
|
||||
return pod
|
||||
|
@ -24,6 +24,8 @@ import (
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
"github.com/onsi/gomega/gstruct"
|
||||
"github.com/prometheus/common/model"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -131,6 +133,174 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
|
||||
framework.ExpectEqual(pod.Spec.Containers[0].Image, image)
|
||||
})
|
||||
|
||||
ginkgo.Context("and the container runtime is temporarily down during pod termination [NodeConformance] [Serial] [Disruptive]", func() {
|
||||
ginkgo.It("the mirror pod should terminate successfully", func(ctx context.Context) {
|
||||
ginkgo.By("verifying the pod is described as syncing in metrics")
|
||||
gomega.Eventually(ctx, getKubeletMetrics, 5*time.Second, time.Second).Should(gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
|
||||
"kubelet_working_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_working_pods{config="desired", lifecycle="sync", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="sync", static="true"}`: timelessSample(1),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="sync", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="sync", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="sync", static="unknown"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminating", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminating", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminating", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminating", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="terminating", static="unknown"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminated", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminated", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminated", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminated", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="terminated", static="unknown"}`: timelessSample(0),
|
||||
}),
|
||||
"kubelet_mirror_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_mirror_pods`: timelessSample(1),
|
||||
}),
|
||||
"kubelet_active_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_active_pods{static=""}`: timelessSample(0),
|
||||
`kubelet_active_pods{static="true"}`: timelessSample(1),
|
||||
}),
|
||||
"kubelet_desired_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_desired_pods{static=""}`: timelessSample(0),
|
||||
`kubelet_desired_pods{static="true"}`: timelessSample(1),
|
||||
}),
|
||||
}))
|
||||
|
||||
ginkgo.By("delete the static pod")
|
||||
err := deleteStaticPod(podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Note it is important we have a small delay here as we would like to reproduce https://issues.k8s.io/113091 which requires a failure in syncTerminatingPod()
|
||||
// This requires waiting a small period between the static pod being deleted so that syncTerminatingPod() will attempt to run
|
||||
ginkgo.By("sleeping before stopping the container runtime")
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
ginkgo.By("stop the container runtime")
|
||||
err = stopContainerRuntime()
|
||||
framework.ExpectNoError(err, "expected no error stopping the container runtime")
|
||||
|
||||
ginkgo.By("waiting for the container runtime to be stopped")
|
||||
gomega.Eventually(ctx, func(ctx context.Context) error {
|
||||
_, _, err := getCRIClient()
|
||||
return err
|
||||
}, 2*time.Minute, time.Second*5).ShouldNot(gomega.Succeed())
|
||||
|
||||
ginkgo.By("verifying the mirror pod is running")
|
||||
gomega.Consistently(ctx, func(ctx context.Context) error {
|
||||
return checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
|
||||
}, 19*time.Second, 200*time.Millisecond).Should(gomega.BeNil())
|
||||
|
||||
ginkgo.By("verifying the pod is described as terminating in metrics")
|
||||
gomega.Eventually(ctx, getKubeletMetrics, 5*time.Second, time.Second).Should(gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
|
||||
"kubelet_working_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_working_pods{config="desired", lifecycle="sync", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="sync", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="sync", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="sync", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="sync", static="unknown"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminating", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminating", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminating", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminating", static="true"}`: timelessSample(1),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="terminating", static="unknown"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminated", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminated", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminated", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminated", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="terminated", static="unknown"}`: timelessSample(0),
|
||||
}),
|
||||
"kubelet_mirror_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_mirror_pods`: timelessSample(1),
|
||||
}),
|
||||
"kubelet_active_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_active_pods{static=""}`: timelessSample(0),
|
||||
// TODO: the pod is still running and consuming resources, it should be considered in
|
||||
// admission https://github.com/kubernetes/kubernetes/issues/104824 for static pods at
|
||||
// least, which means it should be 1
|
||||
`kubelet_active_pods{static="true"}`: timelessSample(0),
|
||||
}),
|
||||
"kubelet_desired_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_desired_pods{static=""}`: timelessSample(0),
|
||||
`kubelet_desired_pods{static="true"}`: timelessSample(0),
|
||||
})}))
|
||||
|
||||
ginkgo.By("start the container runtime")
|
||||
err = startContainerRuntime()
|
||||
framework.ExpectNoError(err, "expected no error starting the container runtime")
|
||||
ginkgo.By("waiting for the container runtime to start")
|
||||
gomega.Eventually(ctx, func(ctx context.Context) error {
|
||||
r, _, err := getCRIClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting CRI client: %w", err)
|
||||
}
|
||||
status, err := r.Status(ctx, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error checking CRI status: %w", err)
|
||||
}
|
||||
framework.Logf("Runtime started: %#v", status)
|
||||
return nil
|
||||
}, 2*time.Minute, time.Second*5).Should(gomega.Succeed())
|
||||
|
||||
ginkgo.By(fmt.Sprintf("verifying that the mirror pod (%s/%s) stops running after about 30s", ns, mirrorPodName))
|
||||
// from the time the container runtime starts, it should take a maximum of:
|
||||
// 20s (grace period) + 2 sync transitions * 1s + 2s between housekeeping + 3s to detect CRI up +
|
||||
// 2s overhead
|
||||
// which we calculate here as "about 30s", so we try a bit longer than that but verify that it is
|
||||
// tightly bounded by not waiting longer (we want to catch regressions to shutdown)
|
||||
time.Sleep(30 * time.Second)
|
||||
gomega.Eventually(ctx, func(ctx context.Context) error {
|
||||
return checkMirrorPodDisappear(ctx, f.ClientSet, mirrorPodName, ns)
|
||||
}, time.Second*3, time.Second).Should(gomega.Succeed())
|
||||
|
||||
ginkgo.By("verifying the pod finishes terminating and is removed from metrics")
|
||||
gomega.Eventually(ctx, getKubeletMetrics, 15*time.Second, time.Second).Should(gstruct.MatchKeys(gstruct.IgnoreExtras, gstruct.Keys{
|
||||
"kubelet_working_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_working_pods{config="desired", lifecycle="sync", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="sync", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="sync", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="sync", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="sync", static="unknown"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminating", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminating", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminating", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminating", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="terminating", static="unknown"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminated", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="desired", lifecycle="terminated", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminated", static=""}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="orphan", lifecycle="terminated", static="true"}`: timelessSample(0),
|
||||
`kubelet_working_pods{config="runtime_only", lifecycle="terminated", static="unknown"}`: timelessSample(0),
|
||||
}),
|
||||
"kubelet_mirror_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_mirror_pods`: timelessSample(0),
|
||||
}),
|
||||
"kubelet_active_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_active_pods{static=""}`: timelessSample(0),
|
||||
`kubelet_active_pods{static="true"}`: timelessSample(0),
|
||||
}),
|
||||
"kubelet_desired_pods": gstruct.MatchElements(sampleLabelID, 0, gstruct.Elements{
|
||||
`kubelet_desired_pods{static=""}`: timelessSample(0),
|
||||
`kubelet_desired_pods{static="true"}`: timelessSample(0),
|
||||
}),
|
||||
}))
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func(ctx context.Context) {
|
||||
ginkgo.By("starting the container runtime")
|
||||
err := startContainerRuntime()
|
||||
framework.ExpectNoError(err, "expected no error starting the container runtime")
|
||||
ginkgo.By("waiting for the container runtime to start")
|
||||
gomega.Eventually(ctx, func(ctx context.Context) error {
|
||||
_, _, err := getCRIClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting cri client: %v", err)
|
||||
}
|
||||
return nil
|
||||
}, 2*time.Minute, time.Second*5).Should(gomega.Succeed())
|
||||
})
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func(ctx context.Context) {
|
||||
ginkgo.By("delete the static pod")
|
||||
err := deleteStaticPod(podPath, staticPodName, ns)
|
||||
@ -197,3 +367,8 @@ func checkMirrorPodRunningWithUID(ctx context.Context, cl clientset.Interface, n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func sampleLabelID(element interface{}) string {
|
||||
el := element.(*model.Sample)
|
||||
return el.Metric.String()
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package e2enode
|
||||
|
||||
import (
|
||||
"context"
|
||||
goerrors "errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -40,11 +39,13 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
"k8s.io/cli-runtime/pkg/printers"
|
||||
e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
|
||||
)
|
||||
|
||||
var _ = SIGDescribe("MirrorPod", func() {
|
||||
f := framework.NewDefaultFramework("mirror-pod")
|
||||
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline
|
||||
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
|
||||
ginkgo.Context("when create a mirror pod ", func() {
|
||||
var ns, podPath, staticPodName, mirrorPodName string
|
||||
ginkgo.BeforeEach(func(ctx context.Context) {
|
||||
@ -196,8 +197,179 @@ var _ = SIGDescribe("MirrorPod", func() {
|
||||
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||
})
|
||||
})
|
||||
ginkgo.Context("when recreating a static pod", func() {
|
||||
var ns, podPath, staticPodName, mirrorPodName string
|
||||
ginkgo.It("it should launch successfully even if it temporarily failed termination due to volume failing to unmount [NodeConformance] [Serial]", func(ctx context.Context) {
|
||||
node := getNodeName(ctx, f)
|
||||
ns = f.Namespace.Name
|
||||
c := f.ClientSet
|
||||
nfsTestConfig, nfsServerPod, nfsServerHost := e2evolume.NewNFSServerWithNodeName(ctx, c, ns, []string{"-G", "777", "/exports"}, node)
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
framework.Logf("Cleaning up NFS server pod")
|
||||
e2evolume.TestServerCleanup(ctx, f, nfsTestConfig)
|
||||
})
|
||||
|
||||
podPath = framework.TestContext.KubeletConfig.StaticPodPath
|
||||
staticPodName = "static-pod-nfs-test-pod" + string(uuid.NewUUID())
|
||||
mirrorPodName = staticPodName + "-" + framework.TestContext.NodeName
|
||||
|
||||
ginkgo.By(fmt.Sprintf("Creating nfs test pod: %s", staticPodName))
|
||||
|
||||
err := createStaticPodUsingNfs(nfsServerHost, node, "sleep 999999", podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
ginkgo.By(fmt.Sprintf("Wating for nfs test pod: %s to start running...", staticPodName))
|
||||
gomega.Eventually(func() error {
|
||||
return checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
|
||||
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||
|
||||
mirrorPod, err := c.CoreV1().Pods(ns).Get(ctx, mirrorPodName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
hash, ok := mirrorPod.Annotations[kubetypes.ConfigHashAnnotationKey]
|
||||
if !ok || hash == "" {
|
||||
framework.Failf("Failed to get hash for mirrorPod")
|
||||
}
|
||||
|
||||
ginkgo.By("Stopping the NFS server")
|
||||
stopNfsServer(f, nfsServerPod)
|
||||
|
||||
ginkgo.By("Waiting for NFS server to stop...")
|
||||
time.Sleep(30 * time.Second)
|
||||
|
||||
ginkgo.By(fmt.Sprintf("Deleting the static nfs test pod: %s", staticPodName))
|
||||
err = deleteStaticPod(podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
// Wait 5 mins for syncTerminatedPod to fail. We expect that the pod volume should not be cleaned up because the NFS server is down.
|
||||
gomega.Consistently(func() bool {
|
||||
return podVolumeDirectoryExists(types.UID(hash))
|
||||
}, 5*time.Minute, 10*time.Second).Should(gomega.BeTrue(), "pod volume should exist while nfs server is stopped")
|
||||
|
||||
ginkgo.By("Start the NFS server")
|
||||
restartNfsServer(f, nfsServerPod)
|
||||
|
||||
ginkgo.By("Waiting for the pod volume to deleted after the NFS server is started")
|
||||
gomega.Eventually(func() bool {
|
||||
return podVolumeDirectoryExists(types.UID(hash))
|
||||
}, 5*time.Minute, 10*time.Second).Should(gomega.BeFalse(), "pod volume should be deleted after nfs server is started")
|
||||
|
||||
// Create the static pod again with the same config and expect it to start running
|
||||
err = createStaticPodUsingNfs(nfsServerHost, node, "sleep 999999", podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
ginkgo.By(fmt.Sprintf("Wating for nfs test pod: %s to start running (after being recreated)", staticPodName))
|
||||
gomega.Eventually(func() error {
|
||||
return checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
|
||||
}, 5*time.Minute, 5*time.Second).Should(gomega.BeNil())
|
||||
})
|
||||
|
||||
ginkgo.AfterEach(func(ctx context.Context) {
|
||||
ginkgo.By("delete the static pod")
|
||||
err := deleteStaticPod(podPath, staticPodName, ns)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("wait for the mirror pod to disappear")
|
||||
gomega.Eventually(ctx, func(ctx context.Context) error {
|
||||
return checkMirrorPodDisappear(ctx, f.ClientSet, mirrorPodName, ns)
|
||||
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
func podVolumeDirectoryExists(uid types.UID) bool {
|
||||
podVolumePath := fmt.Sprintf("/var/lib/kubelet/pods/%s/volumes/", uid)
|
||||
var podVolumeDirectoryExists bool
|
||||
|
||||
if _, err := os.Stat(podVolumePath); !os.IsNotExist(err) {
|
||||
podVolumeDirectoryExists = true
|
||||
}
|
||||
|
||||
return podVolumeDirectoryExists
|
||||
}
|
||||
|
||||
// Restart the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 1` command in the
|
||||
// pod's (only) container. This command changes the number of nfs server threads from
|
||||
// (presumably) zero back to 1, and therefore allows nfs to open connections again.
|
||||
func restartNfsServer(f *framework.Framework, serverPod *v1.Pod) {
|
||||
const startcmd = "/usr/sbin/rpc.nfsd 1"
|
||||
_, _, err := e2evolume.PodExec(f, serverPod, startcmd)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
}
|
||||
|
||||
// Stop the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 0` command in the
|
||||
// pod's (only) container. This command changes the number of nfs server threads to 0,
|
||||
// thus closing all open nfs connections.
|
||||
func stopNfsServer(f *framework.Framework, serverPod *v1.Pod) {
|
||||
const stopcmd = "/usr/sbin/rpc.nfsd 0"
|
||||
_, _, err := e2evolume.PodExec(f, serverPod, stopcmd)
|
||||
framework.ExpectNoError(err)
|
||||
}
|
||||
|
||||
func createStaticPodUsingNfs(nfsIP string, nodeName string, cmd string, dir string, name string, ns string) error {
|
||||
ginkgo.By("create pod using nfs volume")
|
||||
|
||||
isPrivileged := true
|
||||
cmdLine := []string{"-c", cmd}
|
||||
pod := &v1.Pod{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1",
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: ns,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: nodeName,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "pod-nfs-vol",
|
||||
Image: imageutils.GetE2EImage(imageutils.BusyBox),
|
||||
Command: []string{"/bin/sh"},
|
||||
Args: cmdLine,
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "nfs-vol",
|
||||
MountPath: "/mnt",
|
||||
},
|
||||
},
|
||||
SecurityContext: &v1.SecurityContext{
|
||||
Privileged: &isPrivileged,
|
||||
},
|
||||
},
|
||||
},
|
||||
RestartPolicy: v1.RestartPolicyNever, //don't restart pod
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "nfs-vol",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
NFS: &v1.NFSVolumeSource{
|
||||
Server: nfsIP,
|
||||
Path: "/exports",
|
||||
ReadOnly: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
file := staticPodPath(dir, name, ns)
|
||||
f, err := os.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
y := printers.YAMLPrinter{}
|
||||
y.PrintObj(pod, f)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func staticPodPath(dir, name, namespace string) string {
|
||||
return filepath.Join(dir, namespace+"-"+name+".yaml")
|
||||
}
|
||||
@ -238,7 +410,10 @@ func checkMirrorPodDisappear(ctx context.Context, cl clientset.Interface, name,
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return goerrors.New("pod not disappear")
|
||||
if err == nil {
|
||||
return fmt.Errorf("mirror pod %v/%v still exists", namespace, name)
|
||||
}
|
||||
return fmt.Errorf("expect mirror pod %v/%v to not exist but got error: %w", namespace, name, err)
|
||||
}
|
||||
|
||||
func checkMirrorPodRunning(ctx context.Context, cl clientset.Interface, name, namespace string) error {
|
||||
|
@ -87,15 +87,6 @@ func (n *NodeE2ERemote) SetupTestPackage(tardir, systemSpecName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// prependCOSMounterFlag prepends the flag for setting the GCI mounter path to
|
||||
// args and returns the result.
|
||||
func prependCOSMounterFlag(args, host, workspace string) (string, error) {
|
||||
klog.V(2).Infof("GCI/COS node and GCI/COS mounter both detected, modifying --experimental-mounter-path accordingly")
|
||||
mounterPath := filepath.Join(workspace, "mounter")
|
||||
args = fmt.Sprintf("--kubelet-flags=--experimental-mounter-path=%s ", mounterPath) + args
|
||||
return args, nil
|
||||
}
|
||||
|
||||
// prependMemcgNotificationFlag prepends the flag for enabling memcg
|
||||
// notification to args and returns the result.
|
||||
func prependMemcgNotificationFlag(args string) string {
|
||||
@ -124,8 +115,7 @@ func osSpecificActions(args, host, workspace string) (string, error) {
|
||||
return args, setKubeletSELinuxLabels(host, workspace)
|
||||
case strings.Contains(output, "gci"), strings.Contains(output, "cos"):
|
||||
args = prependMemcgNotificationFlag(args)
|
||||
args = prependGCPCredentialProviderFlag(args, workspace)
|
||||
return prependCOSMounterFlag(args, host, workspace)
|
||||
return prependGCPCredentialProviderFlag(args, workspace), nil
|
||||
case strings.Contains(output, "ubuntu"):
|
||||
args = prependGCPCredentialProviderFlag(args, workspace)
|
||||
return prependMemcgNotificationFlag(args), nil
|
||||
|
@ -23,11 +23,8 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/tabwriter"
|
||||
@ -39,11 +36,9 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/util/procfs"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
@ -465,38 +460,6 @@ func (r *ResourceCollector) GetResourceTimeSeries() map[string]*perftype.Resourc
|
||||
|
||||
const kubeletProcessName = "kubelet"
|
||||
|
||||
func getPidsForProcess(name, pidFile string) ([]int, error) {
|
||||
if len(pidFile) > 0 {
|
||||
pid, err := getPidFromPidFile(pidFile)
|
||||
if err == nil {
|
||||
return []int{pid}, nil
|
||||
}
|
||||
// log the error and fall back to pidof
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
return procfs.PidOf(name)
|
||||
}
|
||||
|
||||
func getPidFromPidFile(pidFile string) (int, error) {
|
||||
file, err := os.Open(pidFile)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error opening pid file %s: %w", pidFile, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
data, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error reading pid file %s: %w", pidFile, err)
|
||||
}
|
||||
|
||||
pid, err := strconv.Atoi(string(data))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error parsing %s as a number: %w", string(data), err)
|
||||
}
|
||||
|
||||
return pid, nil
|
||||
}
|
||||
|
||||
func getContainerNameForProcess(name, pidFile string) (string, error) {
|
||||
pids, err := getPidsForProcess(name, pidFile)
|
||||
if err != nil {
|
||||
|
@ -25,17 +25,21 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util/procfs"
|
||||
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
@ -55,6 +59,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
|
||||
"github.com/coreos/go-systemd/v22/dbus"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
|
||||
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
|
||||
@ -84,12 +89,14 @@ const (
|
||||
|
||||
var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
|
||||
|
||||
var containerRuntimeUnitName = ""
|
||||
|
||||
func getNodeSummary(ctx context.Context) (*stats.Summary, error) {
|
||||
kubeletConfig, err := getCurrentKubeletConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get current kubelet config")
|
||||
}
|
||||
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to build http request: %w", err)
|
||||
}
|
||||
@ -340,6 +347,71 @@ func findKubeletServiceName(running bool) string {
|
||||
return kubeletServiceName
|
||||
}
|
||||
|
||||
func findContainerRuntimeServiceName() (string, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
conn, err := dbus.NewWithContext(ctx)
|
||||
framework.ExpectNoError(err, "Failed to setup dbus connection")
|
||||
defer conn.Close()
|
||||
|
||||
runtimePids, err := getPidsForProcess(framework.TestContext.ContainerRuntimeProcessName, framework.TestContext.ContainerRuntimePidFile)
|
||||
framework.ExpectNoError(err, "failed to get list of container runtime pids")
|
||||
framework.ExpectEqual(len(runtimePids), 1, "Unexpected number of container runtime pids. Expected 1 but got %v", len(runtimePids))
|
||||
|
||||
containerRuntimePid := runtimePids[0]
|
||||
|
||||
unitName, err := conn.GetUnitNameByPID(ctx, uint32(containerRuntimePid))
|
||||
framework.ExpectNoError(err, "Failed to get container runtime unit name")
|
||||
|
||||
return unitName, nil
|
||||
}
|
||||
|
||||
type containerRuntimeUnitOp int
|
||||
|
||||
const (
|
||||
startContainerRuntimeUnitOp containerRuntimeUnitOp = iota
|
||||
stopContainerRuntimeUnitOp
|
||||
)
|
||||
|
||||
func performContainerRuntimeUnitOp(op containerRuntimeUnitOp) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
conn, err := dbus.NewWithContext(ctx)
|
||||
framework.ExpectNoError(err, "Failed to setup dbus connection")
|
||||
defer conn.Close()
|
||||
|
||||
if containerRuntimeUnitName == "" {
|
||||
containerRuntimeUnitName, err = findContainerRuntimeServiceName()
|
||||
framework.ExpectNoError(err, "Failed to find container runtime name")
|
||||
}
|
||||
|
||||
reschan := make(chan string)
|
||||
|
||||
switch op {
|
||||
case startContainerRuntimeUnitOp:
|
||||
conn.StartUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
|
||||
case stopContainerRuntimeUnitOp:
|
||||
conn.StopUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
|
||||
default:
|
||||
framework.Failf("Unexpected container runtime op: %v", op)
|
||||
}
|
||||
|
||||
job := <-reschan
|
||||
framework.ExpectEqual(job, "done", "Expected job to complete with done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func stopContainerRuntime() error {
|
||||
return performContainerRuntimeUnitOp(stopContainerRuntimeUnitOp)
|
||||
}
|
||||
|
||||
func startContainerRuntime() error {
|
||||
return performContainerRuntimeUnitOp(startContainerRuntimeUnitOp)
|
||||
}
|
||||
|
||||
// restartKubelet restarts the current kubelet service.
|
||||
// the "current" kubelet service is the instance managed by the current e2e_node test run.
|
||||
// If `running` is true, restarts only if the current kubelet is actually running. In some cases,
|
||||
@ -465,3 +537,35 @@ func waitForAllContainerRemoval(ctx context.Context, podName, podNS string) {
|
||||
return nil
|
||||
}, 2*time.Minute, 1*time.Second).Should(gomega.Succeed())
|
||||
}
|
||||
|
||||
func getPidsForProcess(name, pidFile string) ([]int, error) {
|
||||
if len(pidFile) > 0 {
|
||||
pid, err := getPidFromPidFile(pidFile)
|
||||
if err == nil {
|
||||
return []int{pid}, nil
|
||||
}
|
||||
// log the error and fall back to pidof
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
return procfs.PidOf(name)
|
||||
}
|
||||
|
||||
func getPidFromPidFile(pidFile string) (int, error) {
|
||||
file, err := os.Open(pidFile)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
data, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
|
||||
}
|
||||
|
||||
pid, err := strconv.Atoi(string(data))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
|
||||
}
|
||||
|
||||
return pid, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user