Merge pull request #117371 from smarterclayton/minimal_podmanager

kubelet: Don't reference the pod manager interface directly from components
This commit is contained in:
Kubernetes Prow Robot 2023-05-16 14:34:33 -07:00 committed by GitHub
commit bdbfbffef3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 525 additions and 299 deletions

View File

@ -66,8 +66,6 @@ type managerImpl struct {
config Config
// the function to invoke to kill a pod
killPodFunc KillPodFunc
// the function to get the mirror pod by a given static pod
mirrorPodFunc MirrorPodFunc
// the interface that knows how to do image gc
imageGC ImageGC
// the interface that knows how to do container gc
@ -112,7 +110,6 @@ func NewManager(
summaryProvider stats.SummaryProvider,
config Config,
killPodFunc KillPodFunc,
mirrorPodFunc MirrorPodFunc,
imageGC ImageGC,
containerGC ContainerGC,
recorder record.EventRecorder,
@ -123,7 +120,6 @@ func NewManager(
manager := &managerImpl{
clock: clock,
killPodFunc: killPodFunc,
mirrorPodFunc: mirrorPodFunc,
imageGC: imageGC,
containerGC: containerGC,
config: config,

View File

@ -1451,11 +1451,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) {
activePodsFunc := func() []*v1.Pod {
return pods
}
mirrorPodFunc := func(staticPod *v1.Pod) (*v1.Pod, bool) {
mirrorPod := staticPod.DeepCopy()
mirrorPod.Annotations[kubelettypes.ConfigSourceAnnotationKey] = kubelettypes.ApiserverSource
return mirrorPod, true
}
fakeClock := testingclock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
@ -1490,7 +1485,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) {
manager := &managerImpl{
clock: fakeClock,
killPodFunc: podKiller.killPodNow,
mirrorPodFunc: mirrorPodFunc,
imageGC: diskGC,
containerGC: diskGC,
config: config,

View File

@ -611,9 +611,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.startupManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient)
klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager()
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir())
@ -837,7 +836,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig,
killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation)
killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation)
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
@ -936,7 +935,11 @@ type Kubelet struct {
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
heartbeatClient clientset.Interface
rootDirectory string
// mirrorPodClient is used to create and delete mirror pods in the API for static
// pods.
mirrorPodClient kubepod.MirrorClient
rootDirectory string
lastObservedNodeAddressesMux sync.RWMutex
lastObservedNodeAddresses []v1.NodeAddress
@ -944,9 +947,90 @@ type Kubelet struct {
// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
onRepeatedHeartbeatFailure func()
// podWorkers handle syncing Pods in response to events.
// podManager stores the desired set of admitted pods and mirror pods that the kubelet should be
// running. The actual set of running pods is stored on the podWorkers. The manager is populated
// by the kubelet config loops which abstracts receiving configuration from many different sources
// (api for regular pods, local filesystem or http for static pods). The manager may be consulted
// by other components that need to see the set of desired pods. Note that not all desired pods are
// running, and not all running pods are in the podManager - for instance, force deleting a pod
// from the apiserver will remove it from the podManager, but the pod may still be terminating and
// tracked by the podWorkers. Components that need to know the actual consumed resources of the
// node or are driven by podWorkers and the sync*Pod methods (status, volume, stats) should also
// consult the podWorkers when reconciling.
//
// TODO: review all kubelet components that need the actual set of pods (vs the desired set)
// and update them to use podWorkers instead of podManager. This may introduce latency in some
// methods, but avoids race conditions and correctly accounts for terminating pods that have
// been force deleted or static pods that have been updated.
// https://github.com/kubernetes/kubernetes/issues/116970
podManager kubepod.Manager
// podWorkers is responsible for driving the lifecycle state machine of each pod. The worker is
// notified of config changes, updates, periodic reconciliation, container runtime updates, and
// evictions of all desired pods and will invoke reconciliation methods per pod in separate
// goroutines. The podWorkers are authoritative in the kubelet for what pods are actually being
// run and their current state:
//
// * syncing: pod should be running (syncPod)
// * terminating: pod should be stopped (syncTerminatingPod)
// * terminated: pod should have all resources cleaned up (syncTerminatedPod)
//
// and invoke the handler methods that correspond to each state. Components within the
// kubelet that need to know the phase of the pod in order to correctly set up or tear down
// resources must consult the podWorkers.
//
// Once a pod has been accepted by the pod workers, no other pod with that same UID (and
// name+namespace, for static pods) will be started until the first pod has fully terminated
// and been cleaned up by SyncKnownPods. This means a pod may be desired (in API), admitted
// (in pod manager), and requested (by invoking UpdatePod) but not start for an arbitrarily
// long interval because a prior pod is still terminating.
//
// As an event-driven (by UpdatePod) controller, the podWorkers must periodically be resynced
// by the kubelet invoking SyncKnownPods with the desired state (admitted pods in podManager).
// Since the podManager may be unaware of some running pods due to force deletion, the
// podWorkers are responsible for triggering a sync of pods that are no longer desired but
// must still run to completion.
podWorkers PodWorkers
// evictionManager observes the state of the node for situations that could impact node stability
// and evicts pods (sets to phase Failed with reason Evicted) to reduce resource pressure. The
// eviction manager acts on the actual state of the node and considers the podWorker to be
// authoritative.
evictionManager eviction.Manager
// probeManager tracks the set of running pods and ensures any user-defined periodic checks are
// run to introspect the state of each pod. The probe manager acts on the actual state of the node
// and is notified of pods by the podWorker. The probe manager is the authoritative source of the
// most recent probe status and is responsible for notifying the status manager, which
// synthesizes them into the overall pod status.
probeManager prober.Manager
// secretManager caches the set of secrets used by running pods on this node. The podWorkers
// notify the secretManager when pods are started and terminated, and the secretManager must
// then keep the needed secrets up-to-date as they change.
secretManager secret.Manager
// configMapManager caches the set of config maps used by running pods on this node. The
// podWorkers notify the configMapManager when pods are started and terminated, and the
// configMapManager must then keep the needed config maps up-to-date as they change.
configMapManager configmap.Manager
// volumeManager observes the set of running pods and is responsible for attaching, mounting,
// unmounting, and detaching as those pods move through their lifecycle. It periodically
// synchronizes the set of known volumes to the set of actually desired volumes and cleans up
// any orphaned volumes. The volume manager considers the podWorker to be authoritative for
// which pods are running.
volumeManager volumemanager.VolumeManager
// statusManager receives updated pod status updates from the podWorker and updates the API
// status of those pods to match. The statusManager is authoritative for the synthesized
// status of the pod from the kubelet's perspective (other components own the individual
// elements of status) and should be consulted by components in preference to assembling
// that status themselves. Note that the status manager is downstream of the pod worker
// and components that need to check whether a pod is still running should instead directly
// consult the pod worker.
statusManager status.Manager
// resyncInterval is the interval between periodic full reconciliations of
// pods on this node.
resyncInterval time.Duration
@ -954,13 +1038,6 @@ type Kubelet struct {
// sourcesReady records the sources seen by the kubelet, it is thread-safe.
sourcesReady config.SourcesReady
// podManager is a facade that abstracts away the various sources of pods
// this Kubelet services.
podManager kubepod.Manager
// Needed to observe and respond to situations that could impact node stability
evictionManager eviction.Manager
// Optional, defaults to /logs/ from /var/log
logServer http.Handler
// Optional, defaults to simple Docker implementation
@ -1001,8 +1078,6 @@ type Kubelet struct {
// Volume plugins.
volumePluginMgr *volume.VolumePluginMgr
// Handles container probing.
probeManager prober.Manager
// Manages container health check results.
livenessManager proberesults.Manager
readinessManager proberesults.Manager
@ -1024,12 +1099,6 @@ type Kubelet struct {
// Manager for container logs.
containerLogManager logs.ContainerLogManager
// Secret manager.
secretManager secret.Manager
// ConfigMap manager.
configMapManager configmap.Manager
// Cached MachineInfo returned by cadvisor.
machineInfoLock sync.RWMutex
machineInfo *cadvisorapi.MachineInfo
@ -1037,14 +1106,6 @@ type Kubelet struct {
// Handles certificate rotations.
serverCertificateManager certificate.Manager
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager status.Manager
// VolumeManager runs a set of asynchronous loops that figure out which
// volumes need to be attached/mounted/unmounted/detached based on the pods
// scheduled on this node and makes it so.
volumeManager volumemanager.VolumeManager
// Cloud provider interface.
cloud cloudprovider.Interface
// Handles requests to cloud provider with timeout
@ -1110,10 +1171,12 @@ type Kubelet struct {
// nodeLeaseController claims and renews the node lease for this Kubelet
nodeLeaseController lease.Controller
// Generates pod events.
// pleg observes the state of the container runtime and notifies the kubelet of changes to containers, which
// notifies the podWorkers to reconcile the state of the pod (for instance, if a container dies and needs to
// be restarted).
pleg pleg.PodLifecycleEventGenerator
// Evented PLEG
// eventedPleg supplements the pleg to deliver edge-driven container changes with low-latency.
eventedPleg pleg.PodLifecycleEventGenerator
// Store kubecontainer.PodStatus for all pods.
@ -1823,13 +1886,13 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
if kubetypes.IsStaticPod(pod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
@ -1843,7 +1906,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
@ -2134,6 +2197,15 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// - pod whose work is ready.
// - internal modules that request sync of a pod.
//
// This method does not return orphaned pods (those known only to the pod worker that may have
// been deleted from configuration). Those pods are synced by HandlePodCleanups as a consequence
// of driving the state machine to completion.
//
// TODO: Consider synchronizing all pods which have not recently been acted on to be resilient
// to bugs that might prevent updates from being delivered (such as the previous bug with
// orphaned pods). Instead of asking the work queue for pending work, consider asking the
// PodWorker which pods should be synced.
func (kl *Kubelet) getPodsToSync() []*v1.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
@ -2442,32 +2514,6 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle
handler.HandlePodSyncs([]*v1.Pod{pod})
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
// TODO: handle mirror pods in a separate component (issue #17251)
func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
// corresponding static pod. Send update to the pod worker if the static
// pod exists.
if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
}
}
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
@ -2485,8 +2531,18 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
continue
}
@ -2530,8 +2586,12 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
}
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodCreate,
StartTime: start,
})
}
}
@ -2541,12 +2601,21 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.UpdatePod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
}
}
@ -2555,11 +2624,23 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
if kubetypes.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
kl.podManager.RemovePod(pod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
continue
}
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
@ -2569,7 +2650,8 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
}
// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled.
// that should be reconciled. Pods are reconciled when only the status of the
// pod is updated in the API.
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
@ -2577,13 +2659,37 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
// to the pod manager.
kl.podManager.UpdatePod(pod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
// Static pods should be reconciled the same way as regular pods
}
// TODO: reconcile being calculated in the config manager is questionable, and avoiding
// extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be
// merged (after resolving the next two TODOs).
// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
// TODO: this should be unnecessary today - determine what is the cause for this to
// be different than Sync, or if there is a better place for it. For instance, we have
// needsReconcile in kubelet/config, here, and in status_manager.
if status.NeedToReconcilePodReadiness(pod) {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
}
// After an evicted pod is synced, all dead containers in the pod can be removed.
// TODO: this is questionable - status read is async and during eviction we already
// expect to not have some container info. The pod worker knows whether a pod has
// been evicted, so if this is about minimizing the time to react to an eviction we
// can do better. If it's about preserving pod status info we can also do better.
if eviction.PodIsEvicted(pod.Status) {
if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
kl.containerDeletor.deleteContainersInPod("", podStatus, true)
@ -2597,8 +2703,24 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
// Syncing a mirror pod is a programmer error since the intent of sync is to
// batch notify all pending work. We should make it impossible to double sync,
// but for now log a programmer error to prevent accidental introduction.
klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
}
}

View File

@ -984,23 +984,6 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}
// deleteOrphanedMirrorPods checks whether pod killer has done with orphaned mirror pod.
// If pod killing is done, podManager.DeleteMirrorPod() is called to delete mirror pod
// from the API server
func (kl *Kubelet) deleteOrphanedMirrorPods() {
mirrorPods := kl.podManager.GetOrphanedMirrorPodNames()
for _, podFullname := range mirrorPods {
if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) {
_, err := kl.podManager.DeleteMirrorPod(podFullname, nil)
if err != nil {
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
} else {
klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname)
}
}
}
}
// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories. No config changes are sent to pod workers while this method
@ -1031,7 +1014,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
}
}
allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
allPods, mirrorPods, orphanedMirrorPodFullnames := kl.podManager.GetPodsAndMirrorPods()
// Pod phase progresses monotonically. Once a pod has reached a final state,
// it should never leave regardless of the restart policy. The statuses
@ -1127,7 +1110,16 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// Remove any orphaned mirror pods (mirror pods are tracked by name via the
// pod worker)
klog.V(3).InfoS("Clean up orphaned mirror pods")
kl.deleteOrphanedMirrorPods()
for _, podFullname := range orphanedMirrorPodFullnames {
if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) {
_, err := kl.mirrorPodClient.DeleteMirrorPod(podFullname, nil)
if err != nil {
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
} else {
klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname)
}
}
}
// After pruning pod workers for terminated pods get the list of active pods for
// metrics and to determine restarts.
@ -1156,10 +1148,14 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID)
isStatic := kubetypes.IsStaticPod(desiredPod)
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(desiredPod)
if pod == nil || wasMirror {
klog.V(2).InfoS("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
Pod: desiredPod,
Pod: pod,
MirrorPod: mirrorPod,
})
@ -1246,7 +1242,6 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// Cleanup any backoff entries.
kl.backOff.GC()
return nil
}
@ -1352,15 +1347,26 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
return fmt.Errorf("pod %q cannot be found - no logs available", name)
}
podUID := pod.UID
if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
// TODO: this should be using the podWorker's pod store as authoritative, since
// the mirrorPod might still exist, the pod may have been force deleted but
// is still terminating (users should be able to view logs of force deleted static pods
// based on full name).
var podUID types.UID
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
return fmt.Errorf("mirror pod %q does not have a corresponding pod", name)
}
podUID = mirrorPod.UID
} else {
podUID = pod.UID
}
podStatus, found := kl.statusManager.GetPodStatus(podUID)
if !found {
// If there is no cached status, use the status from the
// apiserver. This is useful if kubelet has recently been
// restarted.
// config source (apiserver). This is useful if kubelet
// has recently been restarted.
podStatus = pod.Status
}

View File

@ -260,7 +260,8 @@ func newTestKubeletWithImageList(
kubelet.secretManager = secretManager
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
kubelet.configMapManager = configMapManager
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient)
kubelet.mirrorPodClient = fakeMirrorClient
kubelet.podManager = kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir())
@ -339,7 +340,7 @@ func newTestKubeletWithImageList(
}
// setup eviction manager
evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{},
killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.podManager.GetMirrorPodByPod, kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation())
killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation())
kubelet.evictionManager = evictionManager
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
@ -597,7 +598,11 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
},
}
for _, pod := range pods {
kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now())
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodSync,
StartTime: time.Now(),
})
if !got {
t.Errorf("Should not skip completed pod %q", pod.Name)
}
@ -651,7 +656,11 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
}
for _, pod := range pods {
kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now())
kubelet.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodSync,
StartTime: time.Now(),
})
if !got {
t.Errorf("Should not skip active pod %q", pod.Name)
}
@ -2521,9 +2530,9 @@ func TestHandlePodResourcesResize(t *testing.T) {
testPod2.UID: true,
testPod3.UID: true,
}
defer kubelet.podManager.DeletePod(testPod3)
defer kubelet.podManager.DeletePod(testPod2)
defer kubelet.podManager.DeletePod(testPod1)
defer kubelet.podManager.RemovePod(testPod3)
defer kubelet.podManager.RemovePod(testPod2)
defer kubelet.podManager.RemovePod(testPod1)
tests := []struct {
name string

View File

@ -43,8 +43,6 @@ import (
// pod. When a static pod gets deleted, the associated orphaned mirror pod
// will also be removed.
type Manager interface {
// GetPods returns the regular pods bound to the kubelet and their spec.
GetPods() []*v1.Pod
// GetPodByFullName returns the (non-mirror) pod that matches full name, as well as
// whether the pod was found.
GetPodByFullName(podFullName string) (*v1.Pod, bool)
@ -60,8 +58,18 @@ type Manager interface {
// GetMirrorPodByPod returns the mirror pod for the given static pod and
// whether it was known to the pod manager.
GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
// GetPodsAndMirrorPods returns the both regular and mirror pods.
GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod)
// GetPodAndMirrorPod returns the complement for a pod - if a pod was provided
// and a mirror pod can be found, return it. If a mirror pod is provided and
// the pod can be found, return it and true for wasMirror.
GetPodAndMirrorPod(*v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool)
// GetPods returns the regular pods bound to the kubelet and their spec.
GetPods() []*v1.Pod
// GetPodsAndMirrorPods returns the set of pods, the set of mirror pods, and
// the pod fullnames of any orphaned mirror pods.
GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string)
// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
SetPods(pods []*v1.Pod)
@ -69,12 +77,11 @@ type Manager interface {
AddPod(pod *v1.Pod)
// UpdatePod updates the given pod in the manager.
UpdatePod(pod *v1.Pod)
// DeletePod deletes the given pod from the manager. For mirror pods,
// RemovePod deletes the given pod from the manager. For mirror pods,
// this means deleting the mappings related to mirror pods. For non-
// mirror pods, this means deleting from indexes for all non-mirror pods.
DeletePod(pod *v1.Pod)
// GetOrphanedMirrorPodNames returns names of orphaned mirror pods
GetOrphanedMirrorPodNames() []string
RemovePod(pod *v1.Pod)
// TranslatePodUID returns the actual UID of a pod. If the UID belongs to
// a mirror pod, returns the UID of its static pod. Otherwise, returns the
// original UID.
@ -86,17 +93,12 @@ type Manager interface {
// GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
// UIDs and mirror pod UIDs to static pod UIDs.
GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
// IsMirrorPodOf returns true if mirrorPod is a correct representation of
// pod; false otherwise.
IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool
MirrorClient
}
// basicManager is a functional Manager.
//
// All fields in basicManager are read-only and are updated calling SetPods,
// AddPod, UpdatePod, or DeletePod.
// AddPod, UpdatePod, or RemovePod.
type basicManager struct {
// Protects all internal maps.
lock sync.RWMutex
@ -112,15 +114,11 @@ type basicManager struct {
// Mirror pod UID to pod UID map.
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
// NewBasicPodManager returns a functional Manager.
func NewBasicPodManager(client MirrorClient) Manager {
func NewBasicPodManager() Manager {
pm := &basicManager{}
pm.MirrorClient = client
pm.SetPods(nil)
return pm
}
@ -191,7 +189,7 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
}
}
func (pm *basicManager) DeletePod(pod *v1.Pod) {
func (pm *basicManager) RemovePod(pod *v1.Pod) {
updateMetrics(pod, nil)
pm.lock.Lock()
defer pm.lock.Unlock()
@ -214,12 +212,18 @@ func (pm *basicManager) GetPods() []*v1.Pod {
return podsMapToPods(pm.podByUID)
}
func (pm *basicManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) {
func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) {
pm.lock.RLock()
defer pm.lock.RUnlock()
pods := podsMapToPods(pm.podByUID)
mirrorPods := mirrorPodsMapToMirrorPods(pm.mirrorPodByUID)
return pods, mirrorPods
allPods = podsMapToPods(pm.podByUID)
allMirrorPods = mirrorPodsMapToMirrorPods(pm.mirrorPodByUID)
for podFullName := range pm.mirrorPodByFullName {
if _, ok := pm.podByFullName[podFullName]; !ok {
orphanedMirrorPodFullnames = append(orphanedMirrorPodFullnames, podFullName)
}
}
return allPods, allMirrorPods, orphanedMirrorPodFullnames
}
func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
@ -280,19 +284,8 @@ func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.Resolved
return podToMirror, mirrorToPod
}
func (pm *basicManager) GetOrphanedMirrorPodNames() []string {
pm.lock.RLock()
defer pm.lock.RUnlock()
var podFullNames []string
for podFullName := range pm.mirrorPodByFullName {
if _, ok := pm.podByFullName[podFullName]; !ok {
podFullNames = append(podFullNames, podFullName)
}
}
return podFullNames
}
func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool {
// IsMirrorPodOf returns true if pod and mirrorPod are associated with each other.
func IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool {
// Check name and namespace first.
if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {
return false
@ -333,3 +326,15 @@ func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) {
pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)]
return pod, ok
}
func (pm *basicManager) GetPodAndMirrorPod(aPod *v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
fullName := kubecontainer.GetPodFullName(aPod)
if kubetypes.IsMirrorPod(aPod) {
return pm.podByFullName[fullName], aPod, true
}
return aPod, pm.mirrorPodByFullName[fullName], false
}

View File

@ -20,7 +20,7 @@ import (
"reflect"
"testing"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
@ -30,7 +30,7 @@ import (
// Stub out mirror client for testing purpose.
func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
fakeMirrorClient := podtest.NewFakeMirrorClient()
manager := NewBasicPodManager(fakeMirrorClient).(*basicManager)
manager := NewBasicPodManager().(*basicManager)
return manager, fakeMirrorClient
}
@ -111,7 +111,7 @@ func TestGetSetPods(t *testing.T) {
}
func TestDeletePods(t *testing.T) {
func TestRemovePods(t *testing.T) {
mirrorPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID("mirror-pod-uid"),
@ -147,14 +147,14 @@ func TestDeletePods(t *testing.T) {
podManager, _ := newTestManager()
podManager.SetPods(updates)
podManager.DeletePod(staticPod)
podManager.RemovePod(staticPod)
actualPods := podManager.GetPods()
if len(actualPods) == len(expectedPods) {
t.Fatalf("Run DeletePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods))
t.Fatalf("Run RemovePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods))
}
orphanedMirrorPodNames := podManager.GetOrphanedMirrorPodNames()
_, _, orphanedMirrorPodNames := podManager.GetPodsAndMirrorPods()
expectedOrphanedMirrorPodNameNum := 1
if len(orphanedMirrorPodNames) != expectedOrphanedMirrorPodNameNum {
t.Fatalf("Run getOrphanedMirrorPodNames() error, expected %d orphaned mirror pods, got %d orphaned mirror pods; ", expectedOrphanedMirrorPodNameNum, len(orphanedMirrorPodNames))

View File

@ -64,47 +64,6 @@ func (mr *MockManagerMockRecorder) AddPod(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPod", reflect.TypeOf((*MockManager)(nil).AddPod), arg0)
}
// CreateMirrorPod mocks base method.
func (m *MockManager) CreateMirrorPod(arg0 *v1.Pod) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateMirrorPod", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// CreateMirrorPod indicates an expected call of CreateMirrorPod.
func (mr *MockManagerMockRecorder) CreateMirrorPod(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateMirrorPod", reflect.TypeOf((*MockManager)(nil).CreateMirrorPod), arg0)
}
// DeleteMirrorPod mocks base method.
func (m *MockManager) DeleteMirrorPod(arg0 string, arg1 *types.UID) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteMirrorPod", arg0, arg1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DeleteMirrorPod indicates an expected call of DeleteMirrorPod.
func (mr *MockManagerMockRecorder) DeleteMirrorPod(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMirrorPod", reflect.TypeOf((*MockManager)(nil).DeleteMirrorPod), arg0, arg1)
}
// DeletePod mocks base method.
func (m *MockManager) DeletePod(arg0 *v1.Pod) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "DeletePod", arg0)
}
// DeletePod indicates an expected call of DeletePod.
func (mr *MockManagerMockRecorder) DeletePod(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePod", reflect.TypeOf((*MockManager)(nil).DeletePod), arg0)
}
// GetMirrorPodByPod mocks base method.
func (m *MockManager) GetMirrorPodByPod(arg0 *v1.Pod) (*v1.Pod, bool) {
m.ctrl.T.Helper()
@ -120,18 +79,20 @@ func (mr *MockManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.C
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockManager)(nil).GetMirrorPodByPod), arg0)
}
// GetOrphanedMirrorPodNames mocks base method.
func (m *MockManager) GetOrphanedMirrorPodNames() []string {
// GetPodAndMirrorPod mocks base method.
func (m *MockManager) GetPodAndMirrorPod(arg0 *v1.Pod) (*v1.Pod, *v1.Pod, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetOrphanedMirrorPodNames")
ret0, _ := ret[0].([]string)
return ret0
ret := m.ctrl.Call(m, "GetPodAndMirrorPod", arg0)
ret0, _ := ret[0].(*v1.Pod)
ret1, _ := ret[1].(*v1.Pod)
ret2, _ := ret[2].(bool)
return ret0, ret1, ret2
}
// GetOrphanedMirrorPodNames indicates an expected call of GetOrphanedMirrorPodNames.
func (mr *MockManagerMockRecorder) GetOrphanedMirrorPodNames() *gomock.Call {
// GetPodAndMirrorPod indicates an expected call of GetPodAndMirrorPod.
func (mr *MockManagerMockRecorder) GetPodAndMirrorPod(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrphanedMirrorPodNames", reflect.TypeOf((*MockManager)(nil).GetOrphanedMirrorPodNames))
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodAndMirrorPod", reflect.TypeOf((*MockManager)(nil).GetPodAndMirrorPod), arg0)
}
// GetPodByFullName mocks base method.
@ -209,12 +170,13 @@ func (mr *MockManagerMockRecorder) GetPods() *gomock.Call {
}
// GetPodsAndMirrorPods mocks base method.
func (m *MockManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) {
func (m *MockManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod, []string) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPodsAndMirrorPods")
ret0, _ := ret[0].([]*v1.Pod)
ret1, _ := ret[1].([]*v1.Pod)
return ret0, ret1
ret2, _ := ret[2].([]string)
return ret0, ret1, ret2
}
// GetPodsAndMirrorPods indicates an expected call of GetPodsAndMirrorPods.
@ -238,18 +200,16 @@ func (mr *MockManagerMockRecorder) GetUIDTranslations() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockManager)(nil).GetUIDTranslations))
}
// IsMirrorPodOf mocks base method.
func (m *MockManager) IsMirrorPodOf(arg0, arg1 *v1.Pod) bool {
// RemovePod mocks base method.
func (m *MockManager) RemovePod(arg0 *v1.Pod) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsMirrorPodOf", arg0, arg1)
ret0, _ := ret[0].(bool)
return ret0
m.ctrl.Call(m, "RemovePod", arg0)
}
// IsMirrorPodOf indicates an expected call of IsMirrorPodOf.
func (mr *MockManagerMockRecorder) IsMirrorPodOf(arg0, arg1 interface{}) *gomock.Call {
// RemovePod indicates an expected call of RemovePod.
func (mr *MockManagerMockRecorder) RemovePod(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsMirrorPodOf", reflect.TypeOf((*MockManager)(nil).IsMirrorPodOf), arg0, arg1)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemovePod", reflect.TypeOf((*MockManager)(nil).RemovePod), arg0)
}
// SetPods mocks base method.

View File

@ -1181,6 +1181,12 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update
status.startedAt = p.clock.Now()
status.mergeLastUpdate(update.Options)
// If we are admitting the pod and it is new, record the count of containers
// TODO: We should probably move this into syncPod and add an execution count
// to the syncPod arguments, and this should be recorded on the first sync.
// Leaving it here complicates a particularly important loop.
metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers)))
return ctx, update, true, true, true
}

View File

@ -106,7 +106,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
}
func newTestManager() *manager {
podManager := kubepod.NewBasicPodManager(nil)
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())

View File

@ -87,7 +87,7 @@ func TestTCPPortExhaustion(t *testing.T) {
} else {
testRootDir = tempDir
}
podManager := kubepod.NewBasicPodManager(nil)
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),

View File

@ -160,7 +160,7 @@ func TestDoProbe(t *testing.T) {
} else {
testRootDir = tempDir
}
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir)
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir)
resultsManager(m, probeType).Remove(testContainerID)
}
}

View File

@ -129,7 +129,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura
klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)

View File

@ -72,8 +72,7 @@ func TestRunOnce(t *testing.T) {
}, nil).AnyTimes()
fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager()
podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
fakeRuntime := &containertest.FakeRuntime{}
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
basePath, err := utiltesting.MkTmpdir("kubelet")
@ -87,6 +86,7 @@ func TestRunOnce(t *testing.T) {
cadvisor: cadvisor,
nodeLister: testNodeLister{},
statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, basePath),
mirrorPodClient: podtest.NewFakeMirrorClient(),
podManager: podManager,
podWorkers: &fakePodWorkers{},
os: &containertest.FakeOS{},
@ -134,8 +134,7 @@ func TestRunOnce(t *testing.T) {
fakeKillPodFunc := func(pod *v1.Pod, evict bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
return nil
}
fakeMirrodPodFunc := func(*v1.Pod) (*v1.Pod, bool) { return nil, false }
evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, fakeMirrodPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation())
evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation())
kb.evictionManager = evictionManager
kb.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)

View File

@ -27,18 +27,24 @@ import (
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
}
// NewCRIStatsProvider returns a Provider that provides the node stats
// from cAdvisor and the container stats from CRI.
func NewCRIStatsProvider(
cadvisor cadvisor.Interface,
resourceAnalyzer stats.ResourceAnalyzer,
podManager kubepod.Manager,
podManager PodManager,
runtimeCache kubecontainer.RuntimeCache,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
@ -54,7 +60,7 @@ func NewCRIStatsProvider(
func NewCadvisorStatsProvider(
cadvisor cadvisor.Interface,
resourceAnalyzer stats.ResourceAnalyzer,
podManager kubepod.Manager,
podManager PodManager,
runtimeCache kubecontainer.RuntimeCache,
imageService kubecontainer.ImageService,
statusProvider status.PodStatusProvider,
@ -67,7 +73,7 @@ func NewCadvisorStatsProvider(
// cAdvisor and the container stats using the containerStatsProvider.
func newStatsProvider(
cadvisor cadvisor.Interface,
podManager kubepod.Manager,
podManager PodManager,
runtimeCache kubecontainer.RuntimeCache,
containerStatsProvider containerStatsProvider,
) *Provider {
@ -82,7 +88,7 @@ func newStatsProvider(
// Provider provides the stats of the node and the pod-managed containers.
type Provider struct {
cadvisor cadvisor.Interface
podManager kubepod.Manager
podManager PodManager
runtimeCache kubecontainer.RuntimeCache
containerStatsProvider
}

View File

@ -40,7 +40,6 @@ import (
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status/state"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
statusutil "k8s.io/kubernetes/pkg/util/pod"
@ -70,7 +69,7 @@ type versionedPodStatus struct {
// All methods are thread-safe.
type manager struct {
kubeClient clientset.Interface
podManager kubepod.Manager
podManager PodManager
// Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
@ -87,8 +86,18 @@ type manager struct {
stateFileDirectory string
}
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
// that need to introspect status.
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
GetPodByUID(types.UID) (*v1.Pod, bool)
GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
}
// PodStatusProvider knows how to provide status for a pod. It is intended to be used by other components
// that need to introspect the authoritative status of a pod. The PodStatusProvider represents the actual
// status of a running pod as the kubelet sees it.
type PodStatusProvider interface {
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
// was a cache hit.
@ -149,7 +158,7 @@ type Manager interface {
const syncPeriod = 10 * time.Second
// NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager {
func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager {
return &manager{
kubeClient: kubeClient,
podManager: podManager,

View File

@ -45,12 +45,17 @@ import (
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
)
type mutablePodManager interface {
AddPod(*v1.Pod)
UpdatePod(*v1.Pod)
RemovePod(*v1.Pod)
}
// Generate new instance of test pod with the same initial value.
func getTestPod() *v1.Pod {
return &v1.Pod{
@ -85,8 +90,8 @@ func (m *manager) testSyncBatch() {
}
func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager.AddPod(getTestPod())
podManager := kubepod.NewBasicPodManager()
podManager.(mutablePodManager).AddPod(getTestPod())
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
@ -329,10 +334,10 @@ func TestSyncPodChecksMismatchedUID(t *testing.T) {
syncer := newTestManager(&fake.Clientset{})
pod := getTestPod()
pod.UID = "first"
syncer.podManager.AddPod(pod)
syncer.podManager.(mutablePodManager).AddPod(pod)
differentPod := getTestPod()
differentPod.UID = "second"
syncer.podManager.AddPod(differentPod)
syncer.podManager.(mutablePodManager).AddPod(differentPod)
syncer.kubeClient = fake.NewSimpleClientset(pod)
syncer.SetPodStatus(differentPod, getRandomPodStatus())
verifyActions(t, syncer, []core.Action{getAction()})
@ -532,7 +537,7 @@ func TestStaticPod(t *testing.T) {
m := newTestManager(client)
t.Logf("Create the static pod")
m.podManager.AddPod(staticPod)
m.podManager.(mutablePodManager).AddPod(staticPod)
assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod")
status := getRandomPodStatus()
@ -550,7 +555,7 @@ func TestStaticPod(t *testing.T) {
assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions())
t.Logf("Create the mirror pod")
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID))
@ -567,10 +572,10 @@ func TestStaticPod(t *testing.T) {
verifyActions(t, m, []core.Action{})
t.Logf("Change mirror pod identity.")
m.podManager.DeletePod(mirrorPod)
m.podManager.(mutablePodManager).RemovePod(mirrorPod)
mirrorPod.UID = "new-mirror-pod"
mirrorPod.Status = v1.PodStatus{}
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
t.Logf("Should not update to mirror pod, because UID has changed.")
assert.Equal(t, m.syncBatch(true), 1)
@ -981,7 +986,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager)
@ -1068,7 +1073,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager)
@ -1132,7 +1137,7 @@ func TestSetContainerReadiness(t *testing.T) {
m := newTestManager(&fake.Clientset{})
// Add test pod because the container spec has been changed.
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
t.Log("Setting readiness before status should fail.")
m.SetContainerReadiness(pod.UID, cID1, true)
@ -1216,7 +1221,7 @@ func TestSetContainerStartup(t *testing.T) {
m := newTestManager(&fake.Clientset{})
// Add test pod because the container spec has been changed.
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
t.Log("Setting startup before status should fail.")
m.SetContainerStartup(pod.UID, cID1, true)
@ -1280,11 +1285,11 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
t.Logf("Non-orphaned pods should not be removed.")
m.SetPodStatus(testPod, getRandomPodStatus())
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
staticPod := mirrorPod
staticPod.UID = "static-uid"
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
m.podManager.AddPod(staticPod)
m.podManager.(mutablePodManager).AddPod(staticPod)
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.testSyncBatch()
@ -1312,7 +1317,7 @@ func TestReconcilePodStatus(t *testing.T) {
testPod.Status = podStatus
t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing")
syncer.podManager.UpdatePod(testPod)
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) {
t.Fatalf("Pod status is the same, a reconciliation is not needed")
}
@ -1327,7 +1332,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Logf("Syncbatch should do nothing, as a reconciliation is not required")
normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy()
testPod.Status.StartTime = &normalizedStartTime
syncer.podManager.UpdatePod(testPod)
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) {
t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed")
}
@ -1337,7 +1342,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
changedPodStatus := getRandomPodStatus()
syncer.podManager.UpdatePod(testPod)
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if !syncer.needsReconcile(testPod.UID, changedPodStatus) {
t.Fatalf("Pod status is different, a reconciliation is needed")
}
@ -1360,7 +1365,7 @@ func TestDeletePodBeforeFinished(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
status := getRandomPodStatus()
status.Phase = v1.PodFailed
m.SetPodStatus(pod, status)
@ -1374,7 +1379,7 @@ func TestDeletePodFinished(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
status := getRandomPodStatus()
status.Phase = v1.PodFailed
m.TerminatePod(pod)
@ -1395,8 +1400,8 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(mirrorPod)
m := newTestManager(client)
m.podManager.AddPod(staticPod)
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(staticPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
t.Logf("Verify setup.")
assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod")
assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")

View File

@ -27,8 +27,91 @@ import (
v1 "k8s.io/api/core/v1"
types "k8s.io/apimachinery/pkg/types"
container "k8s.io/kubernetes/pkg/kubelet/container"
types0 "k8s.io/kubernetes/pkg/kubelet/types"
)
// MockPodManager is a mock of PodManager interface.
type MockPodManager struct {
ctrl *gomock.Controller
recorder *MockPodManagerMockRecorder
}
// MockPodManagerMockRecorder is the mock recorder for MockPodManager.
type MockPodManagerMockRecorder struct {
mock *MockPodManager
}
// NewMockPodManager creates a new mock instance.
func NewMockPodManager(ctrl *gomock.Controller) *MockPodManager {
mock := &MockPodManager{ctrl: ctrl}
mock.recorder = &MockPodManagerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPodManager) EXPECT() *MockPodManagerMockRecorder {
return m.recorder
}
// GetMirrorPodByPod mocks base method.
func (m *MockPodManager) GetMirrorPodByPod(arg0 *v1.Pod) (*v1.Pod, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetMirrorPodByPod", arg0)
ret0, _ := ret[0].(*v1.Pod)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetMirrorPodByPod indicates an expected call of GetMirrorPodByPod.
func (mr *MockPodManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockPodManager)(nil).GetMirrorPodByPod), arg0)
}
// GetPodByUID mocks base method.
func (m *MockPodManager) GetPodByUID(arg0 types.UID) (*v1.Pod, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPodByUID", arg0)
ret0, _ := ret[0].(*v1.Pod)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetPodByUID indicates an expected call of GetPodByUID.
func (mr *MockPodManagerMockRecorder) GetPodByUID(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByUID", reflect.TypeOf((*MockPodManager)(nil).GetPodByUID), arg0)
}
// GetUIDTranslations mocks base method.
func (m *MockPodManager) GetUIDTranslations() (map[types0.ResolvedPodUID]types0.MirrorPodUID, map[types0.MirrorPodUID]types0.ResolvedPodUID) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetUIDTranslations")
ret0, _ := ret[0].(map[types0.ResolvedPodUID]types0.MirrorPodUID)
ret1, _ := ret[1].(map[types0.MirrorPodUID]types0.ResolvedPodUID)
return ret0, ret1
}
// GetUIDTranslations indicates an expected call of GetUIDTranslations.
func (mr *MockPodManagerMockRecorder) GetUIDTranslations() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockPodManager)(nil).GetUIDTranslations))
}
// TranslatePodUID mocks base method.
func (m *MockPodManager) TranslatePodUID(uid types.UID) types0.ResolvedPodUID {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TranslatePodUID", uid)
ret0, _ := ret[0].(types0.ResolvedPodUID)
return ret0
}
// TranslatePodUID indicates an expected call of TranslatePodUID.
func (mr *MockPodManagerMockRecorder) TranslatePodUID(uid interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TranslatePodUID", reflect.TypeOf((*MockPodManager)(nil).TranslatePodUID), uid)
}
// MockPodStatusProvider is a mock of PodStatusProvider interface.
type MockPodStatusProvider struct {
ctrl *gomock.Controller

View File

@ -40,7 +40,6 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
@ -70,12 +69,19 @@ type DesiredStateOfWorldPopulator interface {
HasAddedPods() bool
}
// podStateProvider can determine if a pod is going to be terminated.
type podStateProvider interface {
// PodStateProvider can determine if a pod is going to be terminated.
type PodStateProvider interface {
ShouldPodContainersBeTerminating(types.UID) bool
ShouldPodRuntimeBeRemoved(types.UID) bool
}
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
GetPodByUID(types.UID) (*v1.Pod, bool)
GetPods() []*v1.Pod
}
// NewDesiredStateOfWorldPopulator returns a new instance of
// DesiredStateOfWorldPopulator.
//
@ -90,8 +96,8 @@ type podStateProvider interface {
func NewDesiredStateOfWorldPopulator(
kubeClient clientset.Interface,
loopSleepDuration time.Duration,
podManager pod.Manager,
podStateProvider podStateProvider,
podManager PodManager,
podStateProvider PodStateProvider,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
kubeContainerRuntime kubecontainer.Runtime,
@ -121,8 +127,8 @@ func NewDesiredStateOfWorldPopulator(
type desiredStateOfWorldPopulator struct {
kubeClient clientset.Interface
loopSleepDuration time.Duration
podManager pod.Manager
podStateProvider podStateProvider
podManager PodManager
podStateProvider PodStateProvider
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
pods processedPods

View File

@ -17,10 +17,11 @@ limitations under the License.
package populator
import (
"k8s.io/klog/v2/ktesting"
"testing"
"time"
"k8s.io/klog/v2/ktesting"
"fmt"
"github.com/stretchr/testify/require"
@ -37,7 +38,6 @@ import (
"k8s.io/kubernetes/pkg/features"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
@ -288,7 +288,7 @@ func TestFindAndAddNewPods_WithReprocessPodAndVolumeRetrievalError(t *testing.T)
if !dswp.podPreviouslyProcessed(podName) {
t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName)
}
fakePodManager.DeletePod(pod)
fakePodManager.RemovePod(pod)
}
func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) {
@ -324,17 +324,22 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) {
}
}
type mutablePodManager interface {
GetPodByName(string, string) (*v1.Pod, bool)
RemovePod(*v1.Pod)
}
func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t)
podName := util.GetUniquePodName(pod)
//let the pod be terminated
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name)
if !exist {
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
}
podGet.Status.Phase = v1.PodFailed
dswp.podManager.DeletePod(pod)
dswp.podManager.(mutablePodManager).RemovePod(pod)
dswp.findAndRemoveDeletedPods()
@ -390,7 +395,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
podName := util.GetUniquePodName(pod)
//let the pod be terminated
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name)
if !exist {
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
}
@ -452,12 +457,12 @@ func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) {
podName := util.GetUniquePodName(pod)
//let the pod be terminated
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name)
if !exist {
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
}
podGet.Status.Phase = v1.PodFailed
dswp.podManager.DeletePod(pod)
dswp.podManager.(mutablePodManager).RemovePod(pod)
fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}}
// Add the volume to ASW by reconciling.
@ -754,7 +759,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
}
podGet.Status.Phase = v1.PodFailed
fakePodManager.DeletePod(pod)
fakePodManager.RemovePod(pod)
fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}}
//pod is added to fakePodManager but pod state knows the pod is removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted
@ -1613,8 +1618,7 @@ func createDswpWithVolumeWithCustomPluginMgr(t *testing.T, pv *v1.PersistentVolu
return true, pv, nil
})
fakePodManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient())
fakePodManager := kubepod.NewBasicPodManager()
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr, seLinuxTranslator)

View File

@ -39,7 +39,6 @@ import (
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
@ -151,11 +150,18 @@ type VolumeManager interface {
}
// podStateProvider can determine if a pod is going to be terminated
type podStateProvider interface {
type PodStateProvider interface {
ShouldPodContainersBeTerminating(k8stypes.UID) bool
ShouldPodRuntimeBeRemoved(k8stypes.UID) bool
}
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
GetPodByUID(k8stypes.UID) (*v1.Pod, bool)
GetPods() []*v1.Pod
}
// NewVolumeManager returns a new concrete instance implementing the
// VolumeManager interface.
//
@ -167,8 +173,8 @@ type podStateProvider interface {
func NewVolumeManager(
controllerAttachDetachEnabled bool,
nodeName k8stypes.NodeName,
podManager pod.Manager,
podStateProvider podStateProvider,
podManager PodManager,
podStateProvider PodStateProvider,
kubeClient clientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime container.Runtime,

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
@ -88,7 +87,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
@ -144,7 +143,7 @@ func TestWaitForAttachAndMountError(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -220,7 +219,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
claim.Status = v1.PersistentVolumeClaimStatus{
@ -265,7 +264,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)

View File

@ -18,6 +18,7 @@ package node
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
@ -394,10 +395,10 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() {
case *v1.Pod:
for _, status := range t.Status.ContainerStatuses {
if status.State.Waiting == nil {
return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status)
return false, fmt.Errorf("container %q should not be out of waiting: %s", status.Name, toDebugJSON(status))
}
if status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status)
return false, fmt.Errorf("container %q should have reason PodInitializing: %s", status.Name, toDebugJSON(status))
}
}
if len(t.Status.InitContainerStatuses) != 2 {
@ -405,14 +406,14 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() {
}
status := t.Status.InitContainerStatuses[1]
if status.State.Waiting == nil {
return false, fmt.Errorf("second init container should not be out of waiting: %#v", status)
return false, fmt.Errorf("second init container should not be out of waiting: %s", toDebugJSON(status))
}
if status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status)
return false, fmt.Errorf("second init container should have reason PodInitializing: %s", toDebugJSON(status))
}
status = t.Status.InitContainerStatuses[0]
if status.State.Terminated != nil && status.State.Terminated.ExitCode == 0 {
return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status)
return false, fmt.Errorf("first init container should have exitCode != 0: %s", toDebugJSON(status))
}
// continue until we see an attempt to restart the pod
return status.LastTerminationState.Terminated != nil, nil
@ -518,10 +519,10 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() {
case *v1.Pod:
for _, status := range t.Status.ContainerStatuses {
if status.State.Waiting == nil {
return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status)
return false, fmt.Errorf("container %q should not be out of waiting: %s", status.Name, toDebugJSON(status))
}
if status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status)
return false, fmt.Errorf("container %q should have reason PodInitializing: %s", status.Name, toDebugJSON(status))
}
}
if len(t.Status.InitContainerStatuses) != 2 {
@ -530,19 +531,19 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() {
status := t.Status.InitContainerStatuses[0]
if status.State.Terminated == nil {
if status.State.Waiting != nil && status.State.Waiting.Reason != "PodInitializing" {
return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status)
return false, fmt.Errorf("second init container should have reason PodInitializing: %s", toDebugJSON(status))
}
return false, nil
}
if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 {
return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status)
return false, fmt.Errorf("first init container should have exitCode != 0: %s", toDebugJSON(status))
}
status = t.Status.InitContainerStatuses[1]
if status.State.Terminated == nil {
return false, nil
}
if status.State.Terminated.ExitCode == 0 {
return false, fmt.Errorf("second init container should have failed: %#v", status)
return false, fmt.Errorf("second init container should have failed: %s", toDebugJSON(status))
}
return true, nil
default:
@ -566,3 +567,13 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() {
gomega.Expect(endPod.Status.ContainerStatuses[0].State.Waiting).ToNot(gomega.BeNil())
})
})
// toDebugJSON converts an object to its JSON representation for debug logging
// purposes instead of using a struct.
func toDebugJSON(obj interface{}) string {
m, err := json.Marshal(obj)
if err != nil {
return fmt.Sprintf("<error: %v>", err)
}
return string(m)
}