diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index 2c23d4241d7..e47b37a0d05 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -66,8 +66,6 @@ type managerImpl struct { config Config // the function to invoke to kill a pod killPodFunc KillPodFunc - // the function to get the mirror pod by a given static pod - mirrorPodFunc MirrorPodFunc // the interface that knows how to do image gc imageGC ImageGC // the interface that knows how to do container gc @@ -112,7 +110,6 @@ func NewManager( summaryProvider stats.SummaryProvider, config Config, killPodFunc KillPodFunc, - mirrorPodFunc MirrorPodFunc, imageGC ImageGC, containerGC ContainerGC, recorder record.EventRecorder, @@ -123,7 +120,6 @@ func NewManager( manager := &managerImpl{ clock: clock, killPodFunc: killPodFunc, - mirrorPodFunc: mirrorPodFunc, imageGC: imageGC, containerGC: containerGC, config: config, diff --git a/pkg/kubelet/eviction/eviction_manager_test.go b/pkg/kubelet/eviction/eviction_manager_test.go index 5ef5c148879..81562c671d3 100644 --- a/pkg/kubelet/eviction/eviction_manager_test.go +++ b/pkg/kubelet/eviction/eviction_manager_test.go @@ -1451,11 +1451,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) { activePodsFunc := func() []*v1.Pod { return pods } - mirrorPodFunc := func(staticPod *v1.Pod) (*v1.Pod, bool) { - mirrorPod := staticPod.DeepCopy() - mirrorPod.Annotations[kubelettypes.ConfigSourceAnnotationKey] = kubelettypes.ApiserverSource - return mirrorPod, true - } fakeClock := testingclock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} @@ -1490,7 +1485,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) { manager := &managerImpl{ clock: fakeClock, killPodFunc: podKiller.killPodNow, - mirrorPodFunc: mirrorPodFunc, imageGC: diskGC, containerGC: diskGC, config: config, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index be1f0176333..709e35015fb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -611,9 +611,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.startupManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() - // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. - mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) - klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient) + klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) + klet.podManager = kubepod.NewBasicPodManager() klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir()) @@ -837,7 +836,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, - killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) + killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) klet.evictionManager = evictionManager klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) @@ -936,7 +935,11 @@ type Kubelet struct { runtimeCache kubecontainer.RuntimeCache kubeClient clientset.Interface heartbeatClient clientset.Interface - rootDirectory string + // mirrorPodClient is used to create and delete mirror pods in the API for static + // pods. + mirrorPodClient kubepod.MirrorClient + + rootDirectory string lastObservedNodeAddressesMux sync.RWMutex lastObservedNodeAddresses []v1.NodeAddress @@ -944,9 +947,90 @@ type Kubelet struct { // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional. onRepeatedHeartbeatFailure func() - // podWorkers handle syncing Pods in response to events. + // podManager stores the desired set of admitted pods and mirror pods that the kubelet should be + // running. The actual set of running pods is stored on the podWorkers. The manager is populated + // by the kubelet config loops which abstracts receiving configuration from many different sources + // (api for regular pods, local filesystem or http for static pods). The manager may be consulted + // by other components that need to see the set of desired pods. Note that not all desired pods are + // running, and not all running pods are in the podManager - for instance, force deleting a pod + // from the apiserver will remove it from the podManager, but the pod may still be terminating and + // tracked by the podWorkers. Components that need to know the actual consumed resources of the + // node or are driven by podWorkers and the sync*Pod methods (status, volume, stats) should also + // consult the podWorkers when reconciling. + // + // TODO: review all kubelet components that need the actual set of pods (vs the desired set) + // and update them to use podWorkers instead of podManager. This may introduce latency in some + // methods, but avoids race conditions and correctly accounts for terminating pods that have + // been force deleted or static pods that have been updated. + // https://github.com/kubernetes/kubernetes/issues/116970 + podManager kubepod.Manager + + // podWorkers is responsible for driving the lifecycle state machine of each pod. The worker is + // notified of config changes, updates, periodic reconciliation, container runtime updates, and + // evictions of all desired pods and will invoke reconciliation methods per pod in separate + // goroutines. The podWorkers are authoritative in the kubelet for what pods are actually being + // run and their current state: + // + // * syncing: pod should be running (syncPod) + // * terminating: pod should be stopped (syncTerminatingPod) + // * terminated: pod should have all resources cleaned up (syncTerminatedPod) + // + // and invoke the handler methods that correspond to each state. Components within the + // kubelet that need to know the phase of the pod in order to correctly set up or tear down + // resources must consult the podWorkers. + // + // Once a pod has been accepted by the pod workers, no other pod with that same UID (and + // name+namespace, for static pods) will be started until the first pod has fully terminated + // and been cleaned up by SyncKnownPods. This means a pod may be desired (in API), admitted + // (in pod manager), and requested (by invoking UpdatePod) but not start for an arbitrarily + // long interval because a prior pod is still terminating. + // + // As an event-driven (by UpdatePod) controller, the podWorkers must periodically be resynced + // by the kubelet invoking SyncKnownPods with the desired state (admitted pods in podManager). + // Since the podManager may be unaware of some running pods due to force deletion, the + // podWorkers are responsible for triggering a sync of pods that are no longer desired but + // must still run to completion. podWorkers PodWorkers + // evictionManager observes the state of the node for situations that could impact node stability + // and evicts pods (sets to phase Failed with reason Evicted) to reduce resource pressure. The + // eviction manager acts on the actual state of the node and considers the podWorker to be + // authoritative. + evictionManager eviction.Manager + + // probeManager tracks the set of running pods and ensures any user-defined periodic checks are + // run to introspect the state of each pod. The probe manager acts on the actual state of the node + // and is notified of pods by the podWorker. The probe manager is the authoritative source of the + // most recent probe status and is responsible for notifying the status manager, which + // synthesizes them into the overall pod status. + probeManager prober.Manager + + // secretManager caches the set of secrets used by running pods on this node. The podWorkers + // notify the secretManager when pods are started and terminated, and the secretManager must + // then keep the needed secrets up-to-date as they change. + secretManager secret.Manager + + // configMapManager caches the set of config maps used by running pods on this node. The + // podWorkers notify the configMapManager when pods are started and terminated, and the + // configMapManager must then keep the needed config maps up-to-date as they change. + configMapManager configmap.Manager + + // volumeManager observes the set of running pods and is responsible for attaching, mounting, + // unmounting, and detaching as those pods move through their lifecycle. It periodically + // synchronizes the set of known volumes to the set of actually desired volumes and cleans up + // any orphaned volumes. The volume manager considers the podWorker to be authoritative for + // which pods are running. + volumeManager volumemanager.VolumeManager + + // statusManager receives updated pod status updates from the podWorker and updates the API + // status of those pods to match. The statusManager is authoritative for the synthesized + // status of the pod from the kubelet's perspective (other components own the individual + // elements of status) and should be consulted by components in preference to assembling + // that status themselves. Note that the status manager is downstream of the pod worker + // and components that need to check whether a pod is still running should instead directly + // consult the pod worker. + statusManager status.Manager + // resyncInterval is the interval between periodic full reconciliations of // pods on this node. resyncInterval time.Duration @@ -954,13 +1038,6 @@ type Kubelet struct { // sourcesReady records the sources seen by the kubelet, it is thread-safe. sourcesReady config.SourcesReady - // podManager is a facade that abstracts away the various sources of pods - // this Kubelet services. - podManager kubepod.Manager - - // Needed to observe and respond to situations that could impact node stability - evictionManager eviction.Manager - // Optional, defaults to /logs/ from /var/log logServer http.Handler // Optional, defaults to simple Docker implementation @@ -1001,8 +1078,6 @@ type Kubelet struct { // Volume plugins. volumePluginMgr *volume.VolumePluginMgr - // Handles container probing. - probeManager prober.Manager // Manages container health check results. livenessManager proberesults.Manager readinessManager proberesults.Manager @@ -1024,12 +1099,6 @@ type Kubelet struct { // Manager for container logs. containerLogManager logs.ContainerLogManager - // Secret manager. - secretManager secret.Manager - - // ConfigMap manager. - configMapManager configmap.Manager - // Cached MachineInfo returned by cadvisor. machineInfoLock sync.RWMutex machineInfo *cadvisorapi.MachineInfo @@ -1037,14 +1106,6 @@ type Kubelet struct { // Handles certificate rotations. serverCertificateManager certificate.Manager - // Syncs pods statuses with apiserver; also used as a cache of statuses. - statusManager status.Manager - - // VolumeManager runs a set of asynchronous loops that figure out which - // volumes need to be attached/mounted/unmounted/detached based on the pods - // scheduled on this node and makes it so. - volumeManager volumemanager.VolumeManager - // Cloud provider interface. cloud cloudprovider.Interface // Handles requests to cloud provider with timeout @@ -1110,10 +1171,12 @@ type Kubelet struct { // nodeLeaseController claims and renews the node lease for this Kubelet nodeLeaseController lease.Controller - // Generates pod events. + // pleg observes the state of the container runtime and notifies the kubelet of changes to containers, which + // notifies the podWorkers to reconcile the state of the pod (for instance, if a container dies and needs to + // be restarted). pleg pleg.PodLifecycleEventGenerator - // Evented PLEG + // eventedPleg supplements the pleg to deliver edge-driven container changes with low-latency. eventedPleg pleg.PodLifecycleEventGenerator // Store kubecontainer.PodStatus for all pods. @@ -1823,13 +1886,13 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType if kubetypes.IsStaticPod(pod) { deleted := false if mirrorPod != nil { - if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { + if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) { // The mirror pod is semantically different from the static pod. Remove // it. The mirror pod will get recreated later. klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID) podFullName := kubecontainer.GetPodFullName(pod) var err error - deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) + deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) if deleted { klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod)) } else if err != nil { @@ -1843,7 +1906,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) } else { klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.podManager.CreateMirrorPod(pod); err != nil { + if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod)) } } @@ -2134,6 +2197,15 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus // Get pods which should be resynchronized. Currently, the following pod should be resynchronized: // - pod whose work is ready. // - internal modules that request sync of a pod. +// +// This method does not return orphaned pods (those known only to the pod worker that may have +// been deleted from configuration). Those pods are synced by HandlePodCleanups as a consequence +// of driving the state machine to completion. +// +// TODO: Consider synchronizing all pods which have not recently been acted on to be resilient +// to bugs that might prevent updates from being delivered (such as the previous bug with +// orphaned pods). Instead of asking the work queue for pending work, consider asking the +// PodWorker which pods should be synced. func (kl *Kubelet) getPodsToSync() []*v1.Pod { allPods := kl.podManager.GetPods() podUIDs := kl.workQueue.GetWork() @@ -2442,32 +2514,6 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle handler.HandlePodSyncs([]*v1.Pod{pod}) } -// dispatchWork starts the asynchronous sync of the pod in a pod worker. -// If the pod has completed termination, dispatchWork will perform no action. -func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { - // Run the sync in an async worker. - kl.podWorkers.UpdatePod(UpdatePodOptions{ - Pod: pod, - MirrorPod: mirrorPod, - UpdateType: syncType, - StartTime: start, - }) - // Note the number of containers for new pods. - if syncType == kubetypes.SyncPodCreate { - metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) - } -} - -// TODO: handle mirror pods in a separate component (issue #17251) -func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) { - // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the - // corresponding static pod. Send update to the pod worker if the static - // pod exists. - if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok { - kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) - } -} - // HandlePodAdditions is the callback in SyncHandler for pods being added from // a config source. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { @@ -2485,8 +2531,18 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // the apiserver and no action (other than cleanup) is required. kl.podManager.AddPod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) continue } @@ -2530,8 +2586,12 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } } - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodCreate, + StartTime: start, + }) } } @@ -2541,12 +2601,21 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.UpdatePod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) - continue + + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } } - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) + + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) } } @@ -2555,11 +2624,23 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { - kl.podManager.DeletePod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) + kl.podManager.RemovePod(pod) + + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) continue } + // Deletion is allowed to fail because the periodic cleanup routine // will trigger deletion again. if err := kl.deletePod(pod); err != nil { @@ -2569,7 +2650,8 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { } // HandlePodReconcile is the callback in the SyncHandler interface for pods -// that should be reconciled. +// that should be reconciled. Pods are reconciled when only the status of the +// pod is updated in the API. func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { @@ -2577,13 +2659,37 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // to the pod manager. kl.podManager.UpdatePod(pod) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + // Static pods should be reconciled the same way as regular pods + } + + // TODO: reconcile being calculated in the config manager is questionable, and avoiding + // extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be + // merged (after resolving the next two TODOs). + // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation. + // TODO: this should be unnecessary today - determine what is the cause for this to + // be different than Sync, or if there is a better place for it. For instance, we have + // needsReconcile in kubelet/config, here, and in status_manager. if status.NeedToReconcilePodReadiness(pod) { - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } // After an evicted pod is synced, all dead containers in the pod can be removed. + // TODO: this is questionable - status read is async and during eviction we already + // expect to not have some container info. The pod worker knows whether a pod has + // been evicted, so if this is about minimizing the time to react to an eviction we + // can do better. If it's about preserving pod status info we can also do better. if eviction.PodIsEvicted(pod.Status) { if podStatus, err := kl.podCache.Get(pod.UID); err == nil { kl.containerDeletor.deleteContainersInPod("", podStatus, true) @@ -2597,8 +2703,24 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + // Syncing a mirror pod is a programmer error since the intent of sync is to + // batch notify all pending work. We should make it impossible to double sync, + // but for now log a programmer error to prevent accidental introduction. + klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 21fe0253cc7..68d8e938250 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -984,23 +984,6 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po kl.statusManager.RemoveOrphanedStatuses(podUIDs) } -// deleteOrphanedMirrorPods checks whether pod killer has done with orphaned mirror pod. -// If pod killing is done, podManager.DeleteMirrorPod() is called to delete mirror pod -// from the API server -func (kl *Kubelet) deleteOrphanedMirrorPods() { - mirrorPods := kl.podManager.GetOrphanedMirrorPodNames() - for _, podFullname := range mirrorPods { - if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) { - _, err := kl.podManager.DeleteMirrorPod(podFullname, nil) - if err != nil { - klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname) - } else { - klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname) - } - } - } -} - // HandlePodCleanups performs a series of cleanup work, including terminating // pod workers, killing unwanted pods, and removing orphaned volumes/pod // directories. No config changes are sent to pod workers while this method @@ -1031,7 +1014,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { } } - allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods() + allPods, mirrorPods, orphanedMirrorPodFullnames := kl.podManager.GetPodsAndMirrorPods() // Pod phase progresses monotonically. Once a pod has reached a final state, // it should never leave regardless of the restart policy. The statuses @@ -1127,7 +1110,16 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // Remove any orphaned mirror pods (mirror pods are tracked by name via the // pod worker) klog.V(3).InfoS("Clean up orphaned mirror pods") - kl.deleteOrphanedMirrorPods() + for _, podFullname := range orphanedMirrorPodFullnames { + if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) { + _, err := kl.mirrorPodClient.DeleteMirrorPod(podFullname, nil) + if err != nil { + klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname) + } else { + klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname) + } + } + } // After pruning pod workers for terminated pods get the list of active pods for // metrics and to determine restarts. @@ -1156,10 +1148,14 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID) isStatic := kubetypes.IsStaticPod(desiredPod) - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(desiredPod) + if pod == nil || wasMirror { + klog.V(2).InfoS("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID) + continue + } kl.podWorkers.UpdatePod(UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, - Pod: desiredPod, + Pod: pod, MirrorPod: mirrorPod, }) @@ -1246,7 +1242,6 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // Cleanup any backoff entries. kl.backOff.GC() - return nil } @@ -1352,15 +1347,26 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con return fmt.Errorf("pod %q cannot be found - no logs available", name) } - podUID := pod.UID - if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok { + // TODO: this should be using the podWorker's pod store as authoritative, since + // the mirrorPod might still exist, the pod may have been force deleted but + // is still terminating (users should be able to view logs of force deleted static pods + // based on full name). + var podUID types.UID + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + return fmt.Errorf("mirror pod %q does not have a corresponding pod", name) + } podUID = mirrorPod.UID + } else { + podUID = pod.UID } + podStatus, found := kl.statusManager.GetPodStatus(podUID) if !found { // If there is no cached status, use the status from the - // apiserver. This is useful if kubelet has recently been - // restarted. + // config source (apiserver). This is useful if kubelet + // has recently been restarted. podStatus = pod.Status } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5c3a1a4aa68..5b62ca1289c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -260,7 +260,8 @@ func newTestKubeletWithImageList( kubelet.secretManager = secretManager configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient) kubelet.configMapManager = configMapManager - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient) + kubelet.mirrorPodClient = fakeMirrorClient + kubelet.podManager = kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir()) @@ -339,7 +340,7 @@ func newTestKubeletWithImageList( } // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, - killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.podManager.GetMirrorPodByPod, kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) + killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) kubelet.evictionManager = evictionManager kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) @@ -597,7 +598,11 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { }, } for _, pod := range pods { - kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now()) + kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodSync, + StartTime: time.Now(), + }) if !got { t.Errorf("Should not skip completed pod %q", pod.Name) } @@ -651,7 +656,11 @@ func TestDispatchWorkOfActivePod(t *testing.T) { } for _, pod := range pods { - kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now()) + kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodSync, + StartTime: time.Now(), + }) if !got { t.Errorf("Should not skip active pod %q", pod.Name) } @@ -2521,9 +2530,9 @@ func TestHandlePodResourcesResize(t *testing.T) { testPod2.UID: true, testPod3.UID: true, } - defer kubelet.podManager.DeletePod(testPod3) - defer kubelet.podManager.DeletePod(testPod2) - defer kubelet.podManager.DeletePod(testPod1) + defer kubelet.podManager.RemovePod(testPod3) + defer kubelet.podManager.RemovePod(testPod2) + defer kubelet.podManager.RemovePod(testPod1) tests := []struct { name string diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 69457e6c983..e3cc4f76080 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -43,8 +43,6 @@ import ( // pod. When a static pod gets deleted, the associated orphaned mirror pod // will also be removed. type Manager interface { - // GetPods returns the regular pods bound to the kubelet and their spec. - GetPods() []*v1.Pod // GetPodByFullName returns the (non-mirror) pod that matches full name, as well as // whether the pod was found. GetPodByFullName(podFullName string) (*v1.Pod, bool) @@ -60,8 +58,18 @@ type Manager interface { // GetMirrorPodByPod returns the mirror pod for the given static pod and // whether it was known to the pod manager. GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool) - // GetPodsAndMirrorPods returns the both regular and mirror pods. - GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) + // GetPodAndMirrorPod returns the complement for a pod - if a pod was provided + // and a mirror pod can be found, return it. If a mirror pod is provided and + // the pod can be found, return it and true for wasMirror. + GetPodAndMirrorPod(*v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) + + // GetPods returns the regular pods bound to the kubelet and their spec. + GetPods() []*v1.Pod + + // GetPodsAndMirrorPods returns the set of pods, the set of mirror pods, and + // the pod fullnames of any orphaned mirror pods. + GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) + // SetPods replaces the internal pods with the new pods. // It is currently only used for testing. SetPods(pods []*v1.Pod) @@ -69,12 +77,11 @@ type Manager interface { AddPod(pod *v1.Pod) // UpdatePod updates the given pod in the manager. UpdatePod(pod *v1.Pod) - // DeletePod deletes the given pod from the manager. For mirror pods, + // RemovePod deletes the given pod from the manager. For mirror pods, // this means deleting the mappings related to mirror pods. For non- // mirror pods, this means deleting from indexes for all non-mirror pods. - DeletePod(pod *v1.Pod) - // GetOrphanedMirrorPodNames returns names of orphaned mirror pods - GetOrphanedMirrorPodNames() []string + RemovePod(pod *v1.Pod) + // TranslatePodUID returns the actual UID of a pod. If the UID belongs to // a mirror pod, returns the UID of its static pod. Otherwise, returns the // original UID. @@ -86,17 +93,12 @@ type Manager interface { // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod // UIDs and mirror pod UIDs to static pod UIDs. GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) - // IsMirrorPodOf returns true if mirrorPod is a correct representation of - // pod; false otherwise. - IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool - - MirrorClient } // basicManager is a functional Manager. // // All fields in basicManager are read-only and are updated calling SetPods, -// AddPod, UpdatePod, or DeletePod. +// AddPod, UpdatePod, or RemovePod. type basicManager struct { // Protects all internal maps. lock sync.RWMutex @@ -112,15 +114,11 @@ type basicManager struct { // Mirror pod UID to pod UID map. translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID - - // A mirror pod client to create/delete mirror pods. - MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient) Manager { +func NewBasicPodManager() Manager { pm := &basicManager{} - pm.MirrorClient = client pm.SetPods(nil) return pm } @@ -191,7 +189,7 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { } } -func (pm *basicManager) DeletePod(pod *v1.Pod) { +func (pm *basicManager) RemovePod(pod *v1.Pod) { updateMetrics(pod, nil) pm.lock.Lock() defer pm.lock.Unlock() @@ -214,12 +212,18 @@ func (pm *basicManager) GetPods() []*v1.Pod { return podsMapToPods(pm.podByUID) } -func (pm *basicManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) { +func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) { pm.lock.RLock() defer pm.lock.RUnlock() - pods := podsMapToPods(pm.podByUID) - mirrorPods := mirrorPodsMapToMirrorPods(pm.mirrorPodByUID) - return pods, mirrorPods + allPods = podsMapToPods(pm.podByUID) + allMirrorPods = mirrorPodsMapToMirrorPods(pm.mirrorPodByUID) + + for podFullName := range pm.mirrorPodByFullName { + if _, ok := pm.podByFullName[podFullName]; !ok { + orphanedMirrorPodFullnames = append(orphanedMirrorPodFullnames, podFullName) + } + } + return allPods, allMirrorPods, orphanedMirrorPodFullnames } func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) { @@ -280,19 +284,8 @@ func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.Resolved return podToMirror, mirrorToPod } -func (pm *basicManager) GetOrphanedMirrorPodNames() []string { - pm.lock.RLock() - defer pm.lock.RUnlock() - var podFullNames []string - for podFullName := range pm.mirrorPodByFullName { - if _, ok := pm.podByFullName[podFullName]; !ok { - podFullNames = append(podFullNames, podFullName) - } - } - return podFullNames -} - -func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool { +// IsMirrorPodOf returns true if pod and mirrorPod are associated with each other. +func IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool { // Check name and namespace first. if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace { return false @@ -333,3 +326,15 @@ func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) { pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)] return pod, ok } + +func (pm *basicManager) GetPodAndMirrorPod(aPod *v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + + fullName := kubecontainer.GetPodFullName(aPod) + if kubetypes.IsMirrorPod(aPod) { + return pm.podByFullName[fullName], aPod, true + } + return aPod, pm.mirrorPodByFullName[fullName], false + +} diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 3703ca4626a..8c688ed836c 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -20,7 +20,7 @@ import ( "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" @@ -30,7 +30,7 @@ import ( // Stub out mirror client for testing purpose. func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() - manager := NewBasicPodManager(fakeMirrorClient).(*basicManager) + manager := NewBasicPodManager().(*basicManager) return manager, fakeMirrorClient } @@ -111,7 +111,7 @@ func TestGetSetPods(t *testing.T) { } -func TestDeletePods(t *testing.T) { +func TestRemovePods(t *testing.T) { mirrorPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: types.UID("mirror-pod-uid"), @@ -147,14 +147,14 @@ func TestDeletePods(t *testing.T) { podManager, _ := newTestManager() podManager.SetPods(updates) - podManager.DeletePod(staticPod) + podManager.RemovePod(staticPod) actualPods := podManager.GetPods() if len(actualPods) == len(expectedPods) { - t.Fatalf("Run DeletePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods)) + t.Fatalf("Run RemovePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods)) } - orphanedMirrorPodNames := podManager.GetOrphanedMirrorPodNames() + _, _, orphanedMirrorPodNames := podManager.GetPodsAndMirrorPods() expectedOrphanedMirrorPodNameNum := 1 if len(orphanedMirrorPodNames) != expectedOrphanedMirrorPodNameNum { t.Fatalf("Run getOrphanedMirrorPodNames() error, expected %d orphaned mirror pods, got %d orphaned mirror pods; ", expectedOrphanedMirrorPodNameNum, len(orphanedMirrorPodNames)) diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 2de02e970e6..643705192dd 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -64,47 +64,6 @@ func (mr *MockManagerMockRecorder) AddPod(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPod", reflect.TypeOf((*MockManager)(nil).AddPod), arg0) } -// CreateMirrorPod mocks base method. -func (m *MockManager) CreateMirrorPod(arg0 *v1.Pod) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateMirrorPod", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateMirrorPod indicates an expected call of CreateMirrorPod. -func (mr *MockManagerMockRecorder) CreateMirrorPod(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateMirrorPod", reflect.TypeOf((*MockManager)(nil).CreateMirrorPod), arg0) -} - -// DeleteMirrorPod mocks base method. -func (m *MockManager) DeleteMirrorPod(arg0 string, arg1 *types.UID) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteMirrorPod", arg0, arg1) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DeleteMirrorPod indicates an expected call of DeleteMirrorPod. -func (mr *MockManagerMockRecorder) DeleteMirrorPod(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMirrorPod", reflect.TypeOf((*MockManager)(nil).DeleteMirrorPod), arg0, arg1) -} - -// DeletePod mocks base method. -func (m *MockManager) DeletePod(arg0 *v1.Pod) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "DeletePod", arg0) -} - -// DeletePod indicates an expected call of DeletePod. -func (mr *MockManagerMockRecorder) DeletePod(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePod", reflect.TypeOf((*MockManager)(nil).DeletePod), arg0) -} - // GetMirrorPodByPod mocks base method. func (m *MockManager) GetMirrorPodByPod(arg0 *v1.Pod) (*v1.Pod, bool) { m.ctrl.T.Helper() @@ -120,18 +79,20 @@ func (mr *MockManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockManager)(nil).GetMirrorPodByPod), arg0) } -// GetOrphanedMirrorPodNames mocks base method. -func (m *MockManager) GetOrphanedMirrorPodNames() []string { +// GetPodAndMirrorPod mocks base method. +func (m *MockManager) GetPodAndMirrorPod(arg0 *v1.Pod) (*v1.Pod, *v1.Pod, bool) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetOrphanedMirrorPodNames") - ret0, _ := ret[0].([]string) - return ret0 + ret := m.ctrl.Call(m, "GetPodAndMirrorPod", arg0) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(*v1.Pod) + ret2, _ := ret[2].(bool) + return ret0, ret1, ret2 } -// GetOrphanedMirrorPodNames indicates an expected call of GetOrphanedMirrorPodNames. -func (mr *MockManagerMockRecorder) GetOrphanedMirrorPodNames() *gomock.Call { +// GetPodAndMirrorPod indicates an expected call of GetPodAndMirrorPod. +func (mr *MockManagerMockRecorder) GetPodAndMirrorPod(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrphanedMirrorPodNames", reflect.TypeOf((*MockManager)(nil).GetOrphanedMirrorPodNames)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodAndMirrorPod", reflect.TypeOf((*MockManager)(nil).GetPodAndMirrorPod), arg0) } // GetPodByFullName mocks base method. @@ -209,12 +170,13 @@ func (mr *MockManagerMockRecorder) GetPods() *gomock.Call { } // GetPodsAndMirrorPods mocks base method. -func (m *MockManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) { +func (m *MockManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod, []string) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetPodsAndMirrorPods") ret0, _ := ret[0].([]*v1.Pod) ret1, _ := ret[1].([]*v1.Pod) - return ret0, ret1 + ret2, _ := ret[2].([]string) + return ret0, ret1, ret2 } // GetPodsAndMirrorPods indicates an expected call of GetPodsAndMirrorPods. @@ -238,18 +200,16 @@ func (mr *MockManagerMockRecorder) GetUIDTranslations() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockManager)(nil).GetUIDTranslations)) } -// IsMirrorPodOf mocks base method. -func (m *MockManager) IsMirrorPodOf(arg0, arg1 *v1.Pod) bool { +// RemovePod mocks base method. +func (m *MockManager) RemovePod(arg0 *v1.Pod) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsMirrorPodOf", arg0, arg1) - ret0, _ := ret[0].(bool) - return ret0 + m.ctrl.Call(m, "RemovePod", arg0) } -// IsMirrorPodOf indicates an expected call of IsMirrorPodOf. -func (mr *MockManagerMockRecorder) IsMirrorPodOf(arg0, arg1 interface{}) *gomock.Call { +// RemovePod indicates an expected call of RemovePod. +func (mr *MockManagerMockRecorder) RemovePod(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsMirrorPodOf", reflect.TypeOf((*MockManager)(nil).IsMirrorPodOf), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemovePod", reflect.TypeOf((*MockManager)(nil).RemovePod), arg0) } // SetPods mocks base method. diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index e5805dbcf78..82e7cb93c2c 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -1181,6 +1181,12 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update status.startedAt = p.clock.Now() status.mergeLastUpdate(update.Options) + // If we are admitting the pod and it is new, record the count of containers + // TODO: We should probably move this into syncPod and add an execution count + // to the syncPod arguments, and this should be recorded on the first sync. + // Leaving it here complicates a particularly important loop. + metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers))) + return ctx, update, true, true, true } diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index ec10f49a55c..d071392a1c3 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -106,7 +106,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { } func newTestManager() *manager { - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) diff --git a/pkg/kubelet/prober/scale_test.go b/pkg/kubelet/prober/scale_test.go index 199a52a7237..6de9687e183 100644 --- a/pkg/kubelet/prober/scale_test.go +++ b/pkg/kubelet/prober/scale_test.go @@ -87,7 +87,7 @@ func TestTCPPortExhaustion(t *testing.T) { } else { testRootDir = tempDir } - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() m := NewManager( status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index c95b8f5f2a0..e585e15e7d1 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -160,7 +160,7 @@ func TestDoProbe(t *testing.T) { } else { testRootDir = tempDir } - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 8b63d368d07..b11442ae902 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -129,7 +129,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod)) klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.podManager.CreateMirrorPod(pod); err != nil { + if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 453cf9acff3..f8d0d972585 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -72,8 +72,7 @@ func TestRunOnce(t *testing.T) { }, nil).AnyTimes() fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() - podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() fakeRuntime := &containertest.FakeRuntime{} podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() basePath, err := utiltesting.MkTmpdir("kubelet") @@ -87,6 +86,7 @@ func TestRunOnce(t *testing.T) { cadvisor: cadvisor, nodeLister: testNodeLister{}, statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, basePath), + mirrorPodClient: podtest.NewFakeMirrorClient(), podManager: podManager, podWorkers: &fakePodWorkers{}, os: &containertest.FakeOS{}, @@ -134,8 +134,7 @@ func TestRunOnce(t *testing.T) { fakeKillPodFunc := func(pod *v1.Pod, evict bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error { return nil } - fakeMirrodPodFunc := func(*v1.Pod) (*v1.Pod, bool) { return nil, false } - evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, fakeMirrodPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation()) + evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation()) kb.evictionManager = evictionManager kb.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) diff --git a/pkg/kubelet/stats/provider.go b/pkg/kubelet/stats/provider.go index 1241111236a..09f82c609f5 100644 --- a/pkg/kubelet/stats/provider.go +++ b/pkg/kubelet/stats/provider.go @@ -27,18 +27,24 @@ import ( statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit" "k8s.io/kubernetes/pkg/kubelet/status" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID +} + // NewCRIStatsProvider returns a Provider that provides the node stats // from cAdvisor and the container stats from CRI. func NewCRIStatsProvider( cadvisor cadvisor.Interface, resourceAnalyzer stats.ResourceAnalyzer, - podManager kubepod.Manager, + podManager PodManager, runtimeCache kubecontainer.RuntimeCache, runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, @@ -54,7 +60,7 @@ func NewCRIStatsProvider( func NewCadvisorStatsProvider( cadvisor cadvisor.Interface, resourceAnalyzer stats.ResourceAnalyzer, - podManager kubepod.Manager, + podManager PodManager, runtimeCache kubecontainer.RuntimeCache, imageService kubecontainer.ImageService, statusProvider status.PodStatusProvider, @@ -67,7 +73,7 @@ func NewCadvisorStatsProvider( // cAdvisor and the container stats using the containerStatsProvider. func newStatsProvider( cadvisor cadvisor.Interface, - podManager kubepod.Manager, + podManager PodManager, runtimeCache kubecontainer.RuntimeCache, containerStatsProvider containerStatsProvider, ) *Provider { @@ -82,7 +88,7 @@ func newStatsProvider( // Provider provides the stats of the node and the pod-managed containers. type Provider struct { cadvisor cadvisor.Interface - podManager kubepod.Manager + podManager PodManager runtimeCache kubecontainer.RuntimeCache containerStatsProvider } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 52c93d3db66..70e2dc03812 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -40,7 +40,6 @@ import ( "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" - kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/status/state" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" statusutil "k8s.io/kubernetes/pkg/util/pod" @@ -70,7 +69,7 @@ type versionedPodStatus struct { // All methods are thread-safe. type manager struct { kubeClient clientset.Interface - podManager kubepod.Manager + podManager PodManager // Map from pod UID to sync status of the corresponding pod. podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex @@ -87,8 +86,18 @@ type manager struct { stateFileDirectory string } -// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components -// that need to introspect status. +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + GetPodByUID(types.UID) (*v1.Pod, bool) + GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool) + TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID + GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) +} + +// PodStatusProvider knows how to provide status for a pod. It is intended to be used by other components +// that need to introspect the authoritative status of a pod. The PodStatusProvider represents the actual +// status of a running pod as the kubelet sees it. type PodStatusProvider interface { // GetPodStatus returns the cached status for the provided pod UID, as well as whether it // was a cache hit. @@ -149,7 +158,7 @@ type Manager interface { const syncPeriod = 10 * time.Second // NewManager returns a functional Manager. -func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager { +func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager { return &manager{ kubeClient: kubeClient, podManager: podManager, diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index de4d19b65b4..437b45b26a5 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -45,12 +45,17 @@ import ( "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util" ) +type mutablePodManager interface { + AddPod(*v1.Pod) + UpdatePod(*v1.Pod) + RemovePod(*v1.Pod) +} + // Generate new instance of test pod with the same initial value. func getTestPod() *v1.Pod { return &v1.Pod{ @@ -85,8 +90,8 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) - podManager.AddPod(getTestPod()) + podManager := kubepod.NewBasicPodManager() + podManager.(mutablePodManager).AddPod(getTestPod()) podStartupLatencyTracker := util.NewPodStartupLatencyTracker() testRootDir := "" if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil { @@ -329,10 +334,10 @@ func TestSyncPodChecksMismatchedUID(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) pod := getTestPod() pod.UID = "first" - syncer.podManager.AddPod(pod) + syncer.podManager.(mutablePodManager).AddPod(pod) differentPod := getTestPod() differentPod.UID = "second" - syncer.podManager.AddPod(differentPod) + syncer.podManager.(mutablePodManager).AddPod(differentPod) syncer.kubeClient = fake.NewSimpleClientset(pod) syncer.SetPodStatus(differentPod, getRandomPodStatus()) verifyActions(t, syncer, []core.Action{getAction()}) @@ -532,7 +537,7 @@ func TestStaticPod(t *testing.T) { m := newTestManager(client) t.Logf("Create the static pod") - m.podManager.AddPod(staticPod) + m.podManager.(mutablePodManager).AddPod(staticPod) assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod") status := getRandomPodStatus() @@ -550,7 +555,7 @@ func TestStaticPod(t *testing.T) { assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions()) t.Logf("Create the mirror pod") - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID)) @@ -567,10 +572,10 @@ func TestStaticPod(t *testing.T) { verifyActions(t, m, []core.Action{}) t.Logf("Change mirror pod identity.") - m.podManager.DeletePod(mirrorPod) + m.podManager.(mutablePodManager).RemovePod(mirrorPod) mirrorPod.UID = "new-mirror-pod" mirrorPod.Status = v1.PodStatus{} - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) t.Logf("Should not update to mirror pod, because UID has changed.") assert.Equal(t, m.syncBatch(true), 1) @@ -981,7 +986,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) @@ -1068,7 +1073,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) @@ -1132,7 +1137,7 @@ func TestSetContainerReadiness(t *testing.T) { m := newTestManager(&fake.Clientset{}) // Add test pod because the container spec has been changed. - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) t.Log("Setting readiness before status should fail.") m.SetContainerReadiness(pod.UID, cID1, true) @@ -1216,7 +1221,7 @@ func TestSetContainerStartup(t *testing.T) { m := newTestManager(&fake.Clientset{}) // Add test pod because the container spec has been changed. - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) t.Log("Setting startup before status should fail.") m.SetContainerStartup(pod.UID, cID1, true) @@ -1280,11 +1285,11 @@ func TestSyncBatchCleanupVersions(t *testing.T) { t.Logf("Non-orphaned pods should not be removed.") m.SetPodStatus(testPod, getRandomPodStatus()) - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) staticPod := mirrorPod staticPod.UID = "static-uid" staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} - m.podManager.AddPod(staticPod) + m.podManager.(mutablePodManager).AddPod(staticPod) m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 m.testSyncBatch() @@ -1312,7 +1317,7 @@ func TestReconcilePodStatus(t *testing.T) { testPod.Status = podStatus t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing") - syncer.podManager.UpdatePod(testPod) + syncer.podManager.(mutablePodManager).UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { t.Fatalf("Pod status is the same, a reconciliation is not needed") } @@ -1327,7 +1332,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Logf("Syncbatch should do nothing, as a reconciliation is not required") normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy() testPod.Status.StartTime = &normalizedStartTime - syncer.podManager.UpdatePod(testPod) + syncer.podManager.(mutablePodManager).UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") } @@ -1337,7 +1342,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") changedPodStatus := getRandomPodStatus() - syncer.podManager.UpdatePod(testPod) + syncer.podManager.(mutablePodManager).UpdatePod(testPod) if !syncer.needsReconcile(testPod.UID, changedPodStatus) { t.Fatalf("Pod status is different, a reconciliation is needed") } @@ -1360,7 +1365,7 @@ func TestDeletePodBeforeFinished(t *testing.T) { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) status := getRandomPodStatus() status.Phase = v1.PodFailed m.SetPodStatus(pod, status) @@ -1374,7 +1379,7 @@ func TestDeletePodFinished(t *testing.T) { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) status := getRandomPodStatus() status.Phase = v1.PodFailed m.TerminatePod(pod) @@ -1395,8 +1400,8 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(mirrorPod) m := newTestManager(client) - m.podManager.AddPod(staticPod) - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(staticPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) t.Logf("Verify setup.") assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod") assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") diff --git a/pkg/kubelet/status/testing/mock_pod_status_provider.go b/pkg/kubelet/status/testing/mock_pod_status_provider.go index d88464f9746..fecd2e7f307 100644 --- a/pkg/kubelet/status/testing/mock_pod_status_provider.go +++ b/pkg/kubelet/status/testing/mock_pod_status_provider.go @@ -27,8 +27,91 @@ import ( v1 "k8s.io/api/core/v1" types "k8s.io/apimachinery/pkg/types" container "k8s.io/kubernetes/pkg/kubelet/container" + types0 "k8s.io/kubernetes/pkg/kubelet/types" ) +// MockPodManager is a mock of PodManager interface. +type MockPodManager struct { + ctrl *gomock.Controller + recorder *MockPodManagerMockRecorder +} + +// MockPodManagerMockRecorder is the mock recorder for MockPodManager. +type MockPodManagerMockRecorder struct { + mock *MockPodManager +} + +// NewMockPodManager creates a new mock instance. +func NewMockPodManager(ctrl *gomock.Controller) *MockPodManager { + mock := &MockPodManager{ctrl: ctrl} + mock.recorder = &MockPodManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPodManager) EXPECT() *MockPodManagerMockRecorder { + return m.recorder +} + +// GetMirrorPodByPod mocks base method. +func (m *MockPodManager) GetMirrorPodByPod(arg0 *v1.Pod) (*v1.Pod, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMirrorPodByPod", arg0) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetMirrorPodByPod indicates an expected call of GetMirrorPodByPod. +func (mr *MockPodManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockPodManager)(nil).GetMirrorPodByPod), arg0) +} + +// GetPodByUID mocks base method. +func (m *MockPodManager) GetPodByUID(arg0 types.UID) (*v1.Pod, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPodByUID", arg0) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetPodByUID indicates an expected call of GetPodByUID. +func (mr *MockPodManagerMockRecorder) GetPodByUID(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByUID", reflect.TypeOf((*MockPodManager)(nil).GetPodByUID), arg0) +} + +// GetUIDTranslations mocks base method. +func (m *MockPodManager) GetUIDTranslations() (map[types0.ResolvedPodUID]types0.MirrorPodUID, map[types0.MirrorPodUID]types0.ResolvedPodUID) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUIDTranslations") + ret0, _ := ret[0].(map[types0.ResolvedPodUID]types0.MirrorPodUID) + ret1, _ := ret[1].(map[types0.MirrorPodUID]types0.ResolvedPodUID) + return ret0, ret1 +} + +// GetUIDTranslations indicates an expected call of GetUIDTranslations. +func (mr *MockPodManagerMockRecorder) GetUIDTranslations() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockPodManager)(nil).GetUIDTranslations)) +} + +// TranslatePodUID mocks base method. +func (m *MockPodManager) TranslatePodUID(uid types.UID) types0.ResolvedPodUID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TranslatePodUID", uid) + ret0, _ := ret[0].(types0.ResolvedPodUID) + return ret0 +} + +// TranslatePodUID indicates an expected call of TranslatePodUID. +func (mr *MockPodManagerMockRecorder) TranslatePodUID(uid interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TranslatePodUID", reflect.TypeOf((*MockPodManager)(nil).TranslatePodUID), uid) +} + // MockPodStatusProvider is a mock of PodStatusProvider interface. type MockPodStatusProvider struct { ctrl *gomock.Controller diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index d66acf63af3..8aab267d2c1 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -40,7 +40,6 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csimigration" @@ -70,12 +69,19 @@ type DesiredStateOfWorldPopulator interface { HasAddedPods() bool } -// podStateProvider can determine if a pod is going to be terminated. -type podStateProvider interface { +// PodStateProvider can determine if a pod is going to be terminated. +type PodStateProvider interface { ShouldPodContainersBeTerminating(types.UID) bool ShouldPodRuntimeBeRemoved(types.UID) bool } +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + GetPodByUID(types.UID) (*v1.Pod, bool) + GetPods() []*v1.Pod +} + // NewDesiredStateOfWorldPopulator returns a new instance of // DesiredStateOfWorldPopulator. // @@ -90,8 +96,8 @@ type podStateProvider interface { func NewDesiredStateOfWorldPopulator( kubeClient clientset.Interface, loopSleepDuration time.Duration, - podManager pod.Manager, - podStateProvider podStateProvider, + podManager PodManager, + podStateProvider PodStateProvider, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld, kubeContainerRuntime kubecontainer.Runtime, @@ -121,8 +127,8 @@ func NewDesiredStateOfWorldPopulator( type desiredStateOfWorldPopulator struct { kubeClient clientset.Interface loopSleepDuration time.Duration - podManager pod.Manager - podStateProvider podStateProvider + podManager PodManager + podStateProvider PodStateProvider desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld pods processedPods diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 01e48b335fa..2b4d8f812d6 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -17,10 +17,11 @@ limitations under the License. package populator import ( - "k8s.io/klog/v2/ktesting" "testing" "time" + "k8s.io/klog/v2/ktesting" + "fmt" "github.com/stretchr/testify/require" @@ -37,7 +38,6 @@ import ( "k8s.io/kubernetes/pkg/features" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csimigration" @@ -288,7 +288,7 @@ func TestFindAndAddNewPods_WithReprocessPodAndVolumeRetrievalError(t *testing.T) if !dswp.podPreviouslyProcessed(podName) { t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName) } - fakePodManager.DeletePod(pod) + fakePodManager.RemovePod(pod) } func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) { @@ -324,17 +324,22 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) { } } +type mutablePodManager interface { + GetPodByName(string, string) (*v1.Pod, bool) + RemovePod(*v1.Pod) +} + func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t) podName := util.GetUniquePodName(pod) //let the pod be terminated - podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name) if !exist { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - dswp.podManager.DeletePod(pod) + dswp.podManager.(mutablePodManager).RemovePod(pod) dswp.findAndRemoveDeletedPods() @@ -390,7 +395,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { podName := util.GetUniquePodName(pod) //let the pod be terminated - podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name) if !exist { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } @@ -452,12 +457,12 @@ func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) { podName := util.GetUniquePodName(pod) //let the pod be terminated - podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name) if !exist { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - dswp.podManager.DeletePod(pod) + dswp.podManager.(mutablePodManager).RemovePod(pod) fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}} // Add the volume to ASW by reconciling. @@ -754,7 +759,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - fakePodManager.DeletePod(pod) + fakePodManager.RemovePod(pod) fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}} //pod is added to fakePodManager but pod state knows the pod is removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted @@ -1613,8 +1618,7 @@ func createDswpWithVolumeWithCustomPluginMgr(t *testing.T, pv *v1.PersistentVolu return true, pv, nil }) - fakePodManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient()) + fakePodManager := kubepod.NewBasicPodManager() seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr, seLinuxTranslator) diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 915685bd67d..7e5d00c6d12 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -39,7 +39,6 @@ import ( csitrans "k8s.io/csi-translation-lib" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics" "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator" @@ -151,11 +150,18 @@ type VolumeManager interface { } // podStateProvider can determine if a pod is going to be terminated -type podStateProvider interface { +type PodStateProvider interface { ShouldPodContainersBeTerminating(k8stypes.UID) bool ShouldPodRuntimeBeRemoved(k8stypes.UID) bool } +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + GetPodByUID(k8stypes.UID) (*v1.Pod, bool) + GetPods() []*v1.Pod +} + // NewVolumeManager returns a new concrete instance implementing the // VolumeManager interface. // @@ -167,8 +173,8 @@ type podStateProvider interface { func NewVolumeManager( controllerAttachDetachEnabled bool, nodeName k8stypes.NodeName, - podManager pod.Manager, - podStateProvider podStateProvider, + podManager PodManager, + podStateProvider PodStateProvider, kubeClient clientset.Interface, volumePluginMgr *volume.VolumePluginMgr, kubeContainerRuntime container.Runtime, diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 1b5ce483ac7..cd8b591656f 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" @@ -88,7 +87,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, pv, claim := createObjects(test.pvMode, test.podMode) kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -144,7 +143,7 @@ func TestWaitForAttachAndMountError(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -220,7 +219,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem) claim.Status = v1.PersistentVolumeClaimStatus{ @@ -265,7 +264,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem) diff --git a/test/e2e/common/node/init_container.go b/test/e2e/common/node/init_container.go index a87230e8436..11ebc72a8ef 100644 --- a/test/e2e/common/node/init_container.go +++ b/test/e2e/common/node/init_container.go @@ -18,6 +18,7 @@ package node import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -394,10 +395,10 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { case *v1.Pod: for _, status := range t.Status.ContainerStatuses { if status.State.Waiting == nil { - return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status) + return false, fmt.Errorf("container %q should not be out of waiting: %s", status.Name, toDebugJSON(status)) } if status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status) + return false, fmt.Errorf("container %q should have reason PodInitializing: %s", status.Name, toDebugJSON(status)) } } if len(t.Status.InitContainerStatuses) != 2 { @@ -405,14 +406,14 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { } status := t.Status.InitContainerStatuses[1] if status.State.Waiting == nil { - return false, fmt.Errorf("second init container should not be out of waiting: %#v", status) + return false, fmt.Errorf("second init container should not be out of waiting: %s", toDebugJSON(status)) } if status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status) + return false, fmt.Errorf("second init container should have reason PodInitializing: %s", toDebugJSON(status)) } status = t.Status.InitContainerStatuses[0] if status.State.Terminated != nil && status.State.Terminated.ExitCode == 0 { - return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status) + return false, fmt.Errorf("first init container should have exitCode != 0: %s", toDebugJSON(status)) } // continue until we see an attempt to restart the pod return status.LastTerminationState.Terminated != nil, nil @@ -518,10 +519,10 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { case *v1.Pod: for _, status := range t.Status.ContainerStatuses { if status.State.Waiting == nil { - return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status) + return false, fmt.Errorf("container %q should not be out of waiting: %s", status.Name, toDebugJSON(status)) } if status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status) + return false, fmt.Errorf("container %q should have reason PodInitializing: %s", status.Name, toDebugJSON(status)) } } if len(t.Status.InitContainerStatuses) != 2 { @@ -530,19 +531,19 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { status := t.Status.InitContainerStatuses[0] if status.State.Terminated == nil { if status.State.Waiting != nil && status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status) + return false, fmt.Errorf("second init container should have reason PodInitializing: %s", toDebugJSON(status)) } return false, nil } if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 { - return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status) + return false, fmt.Errorf("first init container should have exitCode != 0: %s", toDebugJSON(status)) } status = t.Status.InitContainerStatuses[1] if status.State.Terminated == nil { return false, nil } if status.State.Terminated.ExitCode == 0 { - return false, fmt.Errorf("second init container should have failed: %#v", status) + return false, fmt.Errorf("second init container should have failed: %s", toDebugJSON(status)) } return true, nil default: @@ -566,3 +567,13 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { gomega.Expect(endPod.Status.ContainerStatuses[0].State.Waiting).ToNot(gomega.BeNil()) }) }) + +// toDebugJSON converts an object to its JSON representation for debug logging +// purposes instead of using a struct. +func toDebugJSON(obj interface{}) string { + m, err := json.Marshal(obj) + if err != nil { + return fmt.Sprintf("", err) + } + return string(m) +}