From 02960a8253ecf89d0487bb8aacdf499c80331064 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 18:09:48 -0500 Subject: [PATCH 1/9] kubelet: Remove unused mirrorPodFunc in eviction Not referenced --- pkg/kubelet/eviction/eviction_manager.go | 4 ---- pkg/kubelet/eviction/eviction_manager_test.go | 6 ------ pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_test.go | 2 +- pkg/kubelet/runonce_test.go | 3 +-- 5 files changed, 3 insertions(+), 14 deletions(-) diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index 2c23d4241d7..e47b37a0d05 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -66,8 +66,6 @@ type managerImpl struct { config Config // the function to invoke to kill a pod killPodFunc KillPodFunc - // the function to get the mirror pod by a given static pod - mirrorPodFunc MirrorPodFunc // the interface that knows how to do image gc imageGC ImageGC // the interface that knows how to do container gc @@ -112,7 +110,6 @@ func NewManager( summaryProvider stats.SummaryProvider, config Config, killPodFunc KillPodFunc, - mirrorPodFunc MirrorPodFunc, imageGC ImageGC, containerGC ContainerGC, recorder record.EventRecorder, @@ -123,7 +120,6 @@ func NewManager( manager := &managerImpl{ clock: clock, killPodFunc: killPodFunc, - mirrorPodFunc: mirrorPodFunc, imageGC: imageGC, containerGC: containerGC, config: config, diff --git a/pkg/kubelet/eviction/eviction_manager_test.go b/pkg/kubelet/eviction/eviction_manager_test.go index 5ef5c148879..81562c671d3 100644 --- a/pkg/kubelet/eviction/eviction_manager_test.go +++ b/pkg/kubelet/eviction/eviction_manager_test.go @@ -1451,11 +1451,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) { activePodsFunc := func() []*v1.Pod { return pods } - mirrorPodFunc := func(staticPod *v1.Pod) (*v1.Pod, bool) { - mirrorPod := staticPod.DeepCopy() - mirrorPod.Annotations[kubelettypes.ConfigSourceAnnotationKey] = kubelettypes.ApiserverSource - return mirrorPod, true - } fakeClock := testingclock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} @@ -1490,7 +1485,6 @@ func TestStaticCriticalPodsAreNotEvicted(t *testing.T) { manager := &managerImpl{ clock: fakeClock, killPodFunc: podKiller.killPodNow, - mirrorPodFunc: mirrorPodFunc, imageGC: diskGC, containerGC: diskGC, config: config, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index be1f0176333..a2a74aba453 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -837,7 +837,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, - killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) + killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, kubeCfg.LocalStorageCapacityIsolation) klet.evictionManager = evictionManager klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5c3a1a4aa68..c8cc4a02df5 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -339,7 +339,7 @@ func newTestKubeletWithImageList( } // setup eviction manager evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, - killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.podManager.GetMirrorPodByPod, kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) + killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation()) kubelet.evictionManager = evictionManager kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 453cf9acff3..5bb062b6561 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -134,8 +134,7 @@ func TestRunOnce(t *testing.T) { fakeKillPodFunc := func(pod *v1.Pod, evict bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error { return nil } - fakeMirrodPodFunc := func(*v1.Pod) (*v1.Pod, bool) { return nil, false } - evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, fakeMirrodPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation()) + evictionManager, evictionAdmitHandler := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, nil, nil, kb.recorder, nodeRef, kb.clock, kb.supportLocalStorageCapacityIsolation()) kb.evictionManager = evictionManager kb.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) From f8086f2dac1e01fb37e137f3d38ddc5aa3bbb7e0 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 20:37:31 -0500 Subject: [PATCH 2/9] kubelet: Convert IsMirrorOf to a function Shrinks the PodManager interface by one method, no abstraction is necessary here. --- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/pod/pod_manager.go | 6 ++---- pkg/kubelet/pod/testing/mock_manager.go | 14 -------------- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a2a74aba453..f38bbb8f263 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1823,7 +1823,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType if kubetypes.IsStaticPod(pod) { deleted := false if mirrorPod != nil { - if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { + if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) { // The mirror pod is semantically different from the static pod. Remove // it. The mirror pod will get recreated later. klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID) diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 69457e6c983..c33c507a494 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -86,9 +86,6 @@ type Manager interface { // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod // UIDs and mirror pod UIDs to static pod UIDs. GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) - // IsMirrorPodOf returns true if mirrorPod is a correct representation of - // pod; false otherwise. - IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool MirrorClient } @@ -292,7 +289,8 @@ func (pm *basicManager) GetOrphanedMirrorPodNames() []string { return podFullNames } -func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool { +// IsMirrorPodOf returns true if pod and mirrorPod are associated with each other. +func IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool { // Check name and namespace first. if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace { return false diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 2de02e970e6..c4cf0dcf553 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -238,20 +238,6 @@ func (mr *MockManagerMockRecorder) GetUIDTranslations() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockManager)(nil).GetUIDTranslations)) } -// IsMirrorPodOf mocks base method. -func (m *MockManager) IsMirrorPodOf(arg0, arg1 *v1.Pod) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsMirrorPodOf", arg0, arg1) - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsMirrorPodOf indicates an expected call of IsMirrorPodOf. -func (mr *MockManagerMockRecorder) IsMirrorPodOf(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsMirrorPodOf", reflect.TypeOf((*MockManager)(nil).IsMirrorPodOf), arg0, arg1) -} - // SetPods mocks base method. func (m *MockManager) SetPods(arg0 []*v1.Pod) { m.ctrl.T.Helper() From e7207c85467efa02547d6fc7f8925ebfac88bc56 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 20:40:19 -0500 Subject: [PATCH 3/9] kubelet: Merge orphaned mirror pod names into GetPodsAndMirrorPods There is only one caller and both sets of data are part of the resync operation between kubelet's desired state and the actual state of the pod workers. Reduces the size of the interface so that it is easier to create another pod manager. --- pkg/kubelet/kubelet_pods.go | 30 +++++++++------------- pkg/kubelet/pod/pod_manager.go | 33 ++++++++++--------------- pkg/kubelet/pod/pod_manager_test.go | 4 +-- pkg/kubelet/pod/testing/mock_manager.go | 19 +++----------- 4 files changed, 29 insertions(+), 57 deletions(-) diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 7d53bdcf01b..fe6e24a4939 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -980,23 +980,6 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po kl.statusManager.RemoveOrphanedStatuses(podUIDs) } -// deleteOrphanedMirrorPods checks whether pod killer has done with orphaned mirror pod. -// If pod killing is done, podManager.DeleteMirrorPod() is called to delete mirror pod -// from the API server -func (kl *Kubelet) deleteOrphanedMirrorPods() { - mirrorPods := kl.podManager.GetOrphanedMirrorPodNames() - for _, podFullname := range mirrorPods { - if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) { - _, err := kl.podManager.DeleteMirrorPod(podFullname, nil) - if err != nil { - klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname) - } else { - klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname) - } - } - } -} - // HandlePodCleanups performs a series of cleanup work, including terminating // pod workers, killing unwanted pods, and removing orphaned volumes/pod // directories. No config changes are sent to pod workers while this method @@ -1027,7 +1010,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { } } - allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods() + allPods, mirrorPods, orphanedMirrorPodFullnames := kl.podManager.GetPodsAndMirrorPods() // Pod phase progresses monotonically. Once a pod has reached a final state, // it should never leave regardless of the restart policy. The statuses @@ -1123,7 +1106,16 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // Remove any orphaned mirror pods (mirror pods are tracked by name via the // pod worker) klog.V(3).InfoS("Clean up orphaned mirror pods") - kl.deleteOrphanedMirrorPods() + for _, podFullname := range orphanedMirrorPodFullnames { + if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) { + _, err := kl.podManager.DeleteMirrorPod(podFullname, nil) + if err != nil { + klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname) + } else { + klog.V(3).InfoS("Deleted mirror pod", "podName", podFullname) + } + } + } // After pruning pod workers for terminated pods get the list of active pods for // metrics and to determine restarts. diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index c33c507a494..88d6864e4fc 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -60,8 +60,9 @@ type Manager interface { // GetMirrorPodByPod returns the mirror pod for the given static pod and // whether it was known to the pod manager. GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool) - // GetPodsAndMirrorPods returns the both regular and mirror pods. - GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) + // GetPodsAndMirrorPods returns the set of pods, the set of mirror pods, and + // the pod fullnames of any orphaned mirror pods. + GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) // SetPods replaces the internal pods with the new pods. // It is currently only used for testing. SetPods(pods []*v1.Pod) @@ -73,8 +74,6 @@ type Manager interface { // this means deleting the mappings related to mirror pods. For non- // mirror pods, this means deleting from indexes for all non-mirror pods. DeletePod(pod *v1.Pod) - // GetOrphanedMirrorPodNames returns names of orphaned mirror pods - GetOrphanedMirrorPodNames() []string // TranslatePodUID returns the actual UID of a pod. If the UID belongs to // a mirror pod, returns the UID of its static pod. Otherwise, returns the // original UID. @@ -211,12 +210,18 @@ func (pm *basicManager) GetPods() []*v1.Pod { return podsMapToPods(pm.podByUID) } -func (pm *basicManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) { +func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) { pm.lock.RLock() defer pm.lock.RUnlock() - pods := podsMapToPods(pm.podByUID) - mirrorPods := mirrorPodsMapToMirrorPods(pm.mirrorPodByUID) - return pods, mirrorPods + allPods = podsMapToPods(pm.podByUID) + allMirrorPods = mirrorPodsMapToMirrorPods(pm.mirrorPodByUID) + + for podFullName := range pm.mirrorPodByFullName { + if _, ok := pm.podByFullName[podFullName]; !ok { + orphanedMirrorPodFullnames = append(orphanedMirrorPodFullnames, podFullName) + } + } + return allPods, allMirrorPods, orphanedMirrorPodFullnames } func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) { @@ -277,18 +282,6 @@ func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.Resolved return podToMirror, mirrorToPod } -func (pm *basicManager) GetOrphanedMirrorPodNames() []string { - pm.lock.RLock() - defer pm.lock.RUnlock() - var podFullNames []string - for podFullName := range pm.mirrorPodByFullName { - if _, ok := pm.podByFullName[podFullName]; !ok { - podFullNames = append(podFullNames, podFullName) - } - } - return podFullNames -} - // IsMirrorPodOf returns true if pod and mirrorPod are associated with each other. func IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool { // Check name and namespace first. diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 3703ca4626a..67cb855d2ba 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -20,7 +20,7 @@ import ( "reflect" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" @@ -154,7 +154,7 @@ func TestDeletePods(t *testing.T) { t.Fatalf("Run DeletePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods)) } - orphanedMirrorPodNames := podManager.GetOrphanedMirrorPodNames() + _, _, orphanedMirrorPodNames := podManager.GetPodsAndMirrorPods() expectedOrphanedMirrorPodNameNum := 1 if len(orphanedMirrorPodNames) != expectedOrphanedMirrorPodNameNum { t.Fatalf("Run getOrphanedMirrorPodNames() error, expected %d orphaned mirror pods, got %d orphaned mirror pods; ", expectedOrphanedMirrorPodNameNum, len(orphanedMirrorPodNames)) diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index c4cf0dcf553..bcf210ad816 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -120,20 +120,6 @@ func (mr *MockManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockManager)(nil).GetMirrorPodByPod), arg0) } -// GetOrphanedMirrorPodNames mocks base method. -func (m *MockManager) GetOrphanedMirrorPodNames() []string { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetOrphanedMirrorPodNames") - ret0, _ := ret[0].([]string) - return ret0 -} - -// GetOrphanedMirrorPodNames indicates an expected call of GetOrphanedMirrorPodNames. -func (mr *MockManagerMockRecorder) GetOrphanedMirrorPodNames() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrphanedMirrorPodNames", reflect.TypeOf((*MockManager)(nil).GetOrphanedMirrorPodNames)) -} - // GetPodByFullName mocks base method. func (m *MockManager) GetPodByFullName(arg0 string) (*v1.Pod, bool) { m.ctrl.T.Helper() @@ -209,12 +195,13 @@ func (mr *MockManagerMockRecorder) GetPods() *gomock.Call { } // GetPodsAndMirrorPods mocks base method. -func (m *MockManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod) { +func (m *MockManager) GetPodsAndMirrorPods() ([]*v1.Pod, []*v1.Pod, []string) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetPodsAndMirrorPods") ret0, _ := ret[0].([]*v1.Pod) ret1, _ := ret[1].([]*v1.Pod) - return ret0, ret1 + ret2, _ := ret[2].([]string) + return ret0, ret1, ret2 } // GetPodsAndMirrorPods indicates an expected call of GetPodsAndMirrorPods. From 80b1aca580b7a7a2d06b7ef236b251be336dc033 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 20:42:24 -0500 Subject: [PATCH 4/9] kubelet: Remove dispatchWork and inline calls to UpdatePod The HandlePod* methods are all structurally similar, but accrued subtle differences. In general the only point for Handle is to process admission and to update the pod worker with the desired state of the kubelet's config (so that pod worker can make it the actual state). Add a new GetPodAndMirrorPod() method that handles when the config pod is ambiguous (pod or mirror pod) and inline the structure. Add comments on questionable additions in the config methods for future improvement. Move the metric observation of container count closer to where pods are actually started (in the pod worker). A future change can likely move it to syncPod. --- pkg/kubelet/kubelet.go | 134 ++++++++++++++++-------- pkg/kubelet/kubelet_pods.go | 28 +++-- pkg/kubelet/kubelet_test.go | 12 ++- pkg/kubelet/pod/pod_manager.go | 16 +++ pkg/kubelet/pod/testing/mock_manager.go | 16 +++ pkg/kubelet/pod_workers.go | 6 ++ 6 files changed, 161 insertions(+), 51 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f38bbb8f263..ed3a44b7d4a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2442,32 +2442,6 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle handler.HandlePodSyncs([]*v1.Pod{pod}) } -// dispatchWork starts the asynchronous sync of the pod in a pod worker. -// If the pod has completed termination, dispatchWork will perform no action. -func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { - // Run the sync in an async worker. - kl.podWorkers.UpdatePod(UpdatePodOptions{ - Pod: pod, - MirrorPod: mirrorPod, - UpdateType: syncType, - StartTime: start, - }) - // Note the number of containers for new pods. - if syncType == kubetypes.SyncPodCreate { - metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) - } -} - -// TODO: handle mirror pods in a separate component (issue #17251) -func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) { - // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the - // corresponding static pod. Send update to the pod worker if the static - // pod exists. - if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok { - kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) - } -} - // HandlePodAdditions is the callback in SyncHandler for pods being added from // a config source. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { @@ -2485,8 +2459,18 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // the apiserver and no action (other than cleanup) is required. kl.podManager.AddPod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) continue } @@ -2530,8 +2514,12 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } } - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodCreate, + StartTime: start, + }) } } @@ -2541,12 +2529,21 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.UpdatePod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) - continue + + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } } - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) + + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) } } @@ -2556,10 +2553,22 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.DeletePod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) + + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) continue } + // Deletion is allowed to fail because the periodic cleanup routine // will trigger deletion again. if err := kl.deletePod(pod); err != nil { @@ -2569,7 +2578,8 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { } // HandlePodReconcile is the callback in the SyncHandler interface for pods -// that should be reconciled. +// that should be reconciled. Pods are reconciled when only the status of the +// pod is updated in the API. func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { @@ -2577,13 +2587,37 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // to the pod manager. kl.podManager.UpdatePod(pod) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + // Static pods should be reconciled the same way as regular pods + } + + // TODO: reconcile being calculated in the config manager is questionable, and avoiding + // extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be + // merged (after resolving the next two TODOs). + // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation. + // TODO: this should be unnecessary today - determine what is the cause for this to + // be different than Sync, or if there is a better place for it. For instance, we have + // needsReconcile in kubelet/config, here, and in status_manager. if status.NeedToReconcilePodReadiness(pod) { - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } // After an evicted pod is synced, all dead containers in the pod can be removed. + // TODO: this is questionable - status read is async and during eviction we already + // expect to not have some container info. The pod worker knows whether a pod has + // been evicted, so if this is about minimizing the time to react to an eviction we + // can do better. If it's about preserving pod status info we can also do better. if eviction.PodIsEvicted(pod.Status) { if podStatus, err := kl.podCache.Get(pod.UID); err == nil { kl.containerDeletor.deleteContainersInPod("", podStatus, true) @@ -2597,8 +2631,24 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + // Syncing a mirror pod is a programmer error since the intent of sync is to + // batch notify all pending work. We should make it impossible to double sync, + // but for now log a programmer error to prevent accidental introduction. + klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index fe6e24a4939..e87da0a13fb 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1144,10 +1144,14 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID) isStatic := kubetypes.IsStaticPod(desiredPod) - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(desiredPod) + if pod == nil || wasMirror { + klog.V(2).InfoS("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID) + continue + } kl.podWorkers.UpdatePod(UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, - Pod: desiredPod, + Pod: pod, MirrorPod: mirrorPod, }) @@ -1234,7 +1238,6 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // Cleanup any backoff entries. kl.backOff.GC() - return nil } @@ -1340,15 +1343,26 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con return fmt.Errorf("pod %q cannot be found - no logs available", name) } - podUID := pod.UID - if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok { + // TODO: this should be using the podWorker's pod store as authoritative, since + // the mirrorPod might still exist, the pod may have been force deleted but + // is still terminating (users should be able to view logs of force deleted static pods + // based on full name). + var podUID types.UID + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + return fmt.Errorf("mirror pod %q does not have a corresponding pod", name) + } podUID = mirrorPod.UID + } else { + podUID = pod.UID } + podStatus, found := kl.statusManager.GetPodStatus(podUID) if !found { // If there is no cached status, use the status from the - // apiserver. This is useful if kubelet has recently been - // restarted. + // config source (apiserver). This is useful if kubelet + // has recently been restarted. podStatus = pod.Status } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index c8cc4a02df5..6608a447c7e 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -597,7 +597,11 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { }, } for _, pod := range pods { - kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now()) + kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodSync, + StartTime: time.Now(), + }) if !got { t.Errorf("Should not skip completed pod %q", pod.Name) } @@ -651,7 +655,11 @@ func TestDispatchWorkOfActivePod(t *testing.T) { } for _, pod := range pods { - kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now()) + kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodSync, + StartTime: time.Now(), + }) if !got { t.Errorf("Should not skip active pod %q", pod.Name) } diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 88d6864e4fc..5426ffc34ad 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -60,6 +60,10 @@ type Manager interface { // GetMirrorPodByPod returns the mirror pod for the given static pod and // whether it was known to the pod manager. GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool) + // GetPodAndMirrorPod returns the complement for a pod - if a pod was provided + // and a mirror pod can be found, return it. If a mirror pod is provided and + // the pod can be found, return it and true for wasMirror. + GetPodAndMirrorPod(*v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) // GetPodsAndMirrorPods returns the set of pods, the set of mirror pods, and // the pod fullnames of any orphaned mirror pods. GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) @@ -324,3 +328,15 @@ func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) { pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)] return pod, ok } + +func (pm *basicManager) GetPodAndMirrorPod(aPod *v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + + fullName := kubecontainer.GetPodFullName(aPod) + if kubetypes.IsMirrorPod(aPod) { + return pm.podByFullName[fullName], aPod, true + } + return aPod, pm.mirrorPodByFullName[fullName], false + +} diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index bcf210ad816..7c4afa0a958 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -120,6 +120,22 @@ func (mr *MockManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockManager)(nil).GetMirrorPodByPod), arg0) } +// GetPodAndMirrorPod mocks base method. +func (m *MockManager) GetPodAndMirrorPod(arg0 *v1.Pod) (*v1.Pod, *v1.Pod, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPodAndMirrorPod", arg0) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(*v1.Pod) + ret2, _ := ret[2].(bool) + return ret0, ret1, ret2 +} + +// GetPodAndMirrorPod indicates an expected call of GetPodAndMirrorPod. +func (mr *MockManagerMockRecorder) GetPodAndMirrorPod(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodAndMirrorPod", reflect.TypeOf((*MockManager)(nil).GetPodAndMirrorPod), arg0) +} + // GetPodByFullName mocks base method. func (m *MockManager) GetPodByFullName(arg0 string) (*v1.Pod, bool) { m.ctrl.T.Helper() diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index e5805dbcf78..82e7cb93c2c 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -1181,6 +1181,12 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update status.startedAt = p.clock.Now() status.mergeLastUpdate(update.Options) + // If we are admitting the pod and it is new, record the count of containers + // TODO: We should probably move this into syncPod and add an execution count + // to the syncPod arguments, and this should be recorded on the first sync. + // Leaving it here complicates a particularly important loop. + metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers))) + return ctx, update, true, true, true } From bb568844b67edf5624cdd8d9a9db5d34e2cc5d79 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 20:54:04 -0500 Subject: [PATCH 5/9] kubelet: Separate the MirrorClient from the PodManager The two are not coupled except accidentally. Separate them and update callsites. This will reduce the scope of PodManager interface to make exposing the pod worker cleaner. --- pkg/kubelet/kubelet.go | 12 ++++---- pkg/kubelet/kubelet_pods.go | 2 +- pkg/kubelet/kubelet_test.go | 3 +- pkg/kubelet/pod/pod_manager.go | 8 +---- pkg/kubelet/pod/pod_manager_test.go | 2 +- pkg/kubelet/pod/testing/mock_manager.go | 29 ------------------- pkg/kubelet/prober/common_test.go | 2 +- pkg/kubelet/prober/scale_test.go | 2 +- pkg/kubelet/prober/worker_test.go | 2 +- pkg/kubelet/runonce.go | 2 +- pkg/kubelet/runonce_test.go | 4 +-- pkg/kubelet/status/status_manager_test.go | 5 ++-- .../desired_state_of_world_populator_test.go | 4 +-- .../volumemanager/volume_manager_test.go | 9 +++--- 14 files changed, 25 insertions(+), 61 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ed3a44b7d4a..4274bb85aed 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -611,9 +611,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.startupManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() - // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. - mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) - klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient) + klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) + klet.podManager = kubepod.NewBasicPodManager() klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir()) @@ -957,6 +956,9 @@ type Kubelet struct { // podManager is a facade that abstracts away the various sources of pods // this Kubelet services. podManager kubepod.Manager + // mirrorPodClient is used to create and delete mirror pods in the API for static + // pods. + mirrorPodClient kubepod.MirrorClient // Needed to observe and respond to situations that could impact node stability evictionManager eviction.Manager @@ -1829,7 +1831,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID) podFullName := kubecontainer.GetPodFullName(pod) var err error - deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) + deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) if deleted { klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod)) } else if err != nil { @@ -1843,7 +1845,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) } else { klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.podManager.CreateMirrorPod(pod); err != nil { + if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod)) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index e87da0a13fb..dcdc1e4cc01 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1108,7 +1108,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { klog.V(3).InfoS("Clean up orphaned mirror pods") for _, podFullname := range orphanedMirrorPodFullnames { if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) { - _, err := kl.podManager.DeleteMirrorPod(podFullname, nil) + _, err := kl.mirrorPodClient.DeleteMirrorPod(podFullname, nil) if err != nil { klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname) } else { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 6608a447c7e..123b57e336b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -260,7 +260,8 @@ func newTestKubeletWithImageList( kubelet.secretManager = secretManager configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient) kubelet.configMapManager = configMapManager - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient) + kubelet.mirrorPodClient = fakeMirrorClient + kubelet.podManager = kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir()) diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 5426ffc34ad..ec47750c229 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -89,8 +89,6 @@ type Manager interface { // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod // UIDs and mirror pod UIDs to static pod UIDs. GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) - - MirrorClient } // basicManager is a functional Manager. @@ -112,15 +110,11 @@ type basicManager struct { // Mirror pod UID to pod UID map. translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID - - // A mirror pod client to create/delete mirror pods. - MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient) Manager { +func NewBasicPodManager() Manager { pm := &basicManager{} - pm.MirrorClient = client pm.SetPods(nil) return pm } diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 67cb855d2ba..3571604118c 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -30,7 +30,7 @@ import ( // Stub out mirror client for testing purpose. func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() - manager := NewBasicPodManager(fakeMirrorClient).(*basicManager) + manager := NewBasicPodManager().(*basicManager) return manager, fakeMirrorClient } diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 7c4afa0a958..2d68384b410 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -64,35 +64,6 @@ func (mr *MockManagerMockRecorder) AddPod(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPod", reflect.TypeOf((*MockManager)(nil).AddPod), arg0) } -// CreateMirrorPod mocks base method. -func (m *MockManager) CreateMirrorPod(arg0 *v1.Pod) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateMirrorPod", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateMirrorPod indicates an expected call of CreateMirrorPod. -func (mr *MockManagerMockRecorder) CreateMirrorPod(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateMirrorPod", reflect.TypeOf((*MockManager)(nil).CreateMirrorPod), arg0) -} - -// DeleteMirrorPod mocks base method. -func (m *MockManager) DeleteMirrorPod(arg0 string, arg1 *types.UID) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteMirrorPod", arg0, arg1) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DeleteMirrorPod indicates an expected call of DeleteMirrorPod. -func (mr *MockManagerMockRecorder) DeleteMirrorPod(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMirrorPod", reflect.TypeOf((*MockManager)(nil).DeleteMirrorPod), arg0, arg1) -} - // DeletePod mocks base method. func (m *MockManager) DeletePod(arg0 *v1.Pod) { m.ctrl.T.Helper() diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index ec10f49a55c..d071392a1c3 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -106,7 +106,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { } func newTestManager() *manager { - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) diff --git a/pkg/kubelet/prober/scale_test.go b/pkg/kubelet/prober/scale_test.go index 199a52a7237..6de9687e183 100644 --- a/pkg/kubelet/prober/scale_test.go +++ b/pkg/kubelet/prober/scale_test.go @@ -87,7 +87,7 @@ func TestTCPPortExhaustion(t *testing.T) { } else { testRootDir = tempDir } - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() m := NewManager( status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index c95b8f5f2a0..e585e15e7d1 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -160,7 +160,7 @@ func TestDoProbe(t *testing.T) { } else { testRootDir = tempDir } - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 8b63d368d07..b11442ae902 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -129,7 +129,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod)) klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.podManager.CreateMirrorPod(pod); err != nil { + if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 5bb062b6561..f8d0d972585 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -72,8 +72,7 @@ func TestRunOnce(t *testing.T) { }, nil).AnyTimes() fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() - podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() fakeRuntime := &containertest.FakeRuntime{} podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() basePath, err := utiltesting.MkTmpdir("kubelet") @@ -87,6 +86,7 @@ func TestRunOnce(t *testing.T) { cadvisor: cadvisor, nodeLister: testNodeLister{}, statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, basePath), + mirrorPodClient: podtest.NewFakeMirrorClient(), podManager: podManager, podWorkers: &fakePodWorkers{}, os: &containertest.FakeOS{}, diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index de4d19b65b4..dd4e03867e8 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -45,7 +45,6 @@ import ( "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util" @@ -85,7 +84,7 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() podManager.AddPod(getTestPod()) podStartupLatencyTracker := util.NewPodStartupLatencyTracker() testRootDir := "" @@ -981,7 +980,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 01e48b335fa..077ed1eca18 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -37,7 +37,6 @@ import ( "k8s.io/kubernetes/pkg/features" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csimigration" @@ -1613,8 +1612,7 @@ func createDswpWithVolumeWithCustomPluginMgr(t *testing.T, pv *v1.PersistentVolu return true, pv, nil }) - fakePodManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient()) + fakePodManager := kubepod.NewBasicPodManager() seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr, seLinuxTranslator) diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 1b5ce483ac7..cd8b591656f 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" @@ -88,7 +87,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, pv, claim := createObjects(test.pvMode, test.podMode) kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -144,7 +143,7 @@ func TestWaitForAttachAndMountError(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -220,7 +219,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem) claim.Status = v1.PersistentVolumeClaimStatus{ @@ -265,7 +264,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem) From 8bd94dfa7623e595ecb760a34a5b1b15d5a9bc82 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 20:55:01 -0500 Subject: [PATCH 6/9] kubelet: Organize and document kubelet pod-related members Clearly describe core pod related component responsibilities in the kubelet members. Organize the PodManager interface for clarity. --- pkg/kubelet/kubelet.go | 130 +++++++++++++++++++++++++-------- pkg/kubelet/pod/pod_manager.go | 8 +- 2 files changed, 106 insertions(+), 32 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4274bb85aed..82ce5706174 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -935,7 +935,11 @@ type Kubelet struct { runtimeCache kubecontainer.RuntimeCache kubeClient clientset.Interface heartbeatClient clientset.Interface - rootDirectory string + // mirrorPodClient is used to create and delete mirror pods in the API for static + // pods. + mirrorPodClient kubepod.MirrorClient + + rootDirectory string lastObservedNodeAddressesMux sync.RWMutex lastObservedNodeAddresses []v1.NodeAddress @@ -943,9 +947,90 @@ type Kubelet struct { // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional. onRepeatedHeartbeatFailure func() - // podWorkers handle syncing Pods in response to events. + // podManager stores the desired set of admitted pods and mirror pods that the kubelet should be + // running. The actual set of running pods is stored on the podWorkers. The manager is populated + // by the kubelet config loops which abstracts receiving configuration from many different sources + // (api for regular pods, local filesystem or http for static pods). The manager may be consulted + // by other components that need to see the set of desired pods. Note that not all desired pods are + // running, and not all running pods are in the podManager - for instance, force deleting a pod + // from the apiserver will remove it from the podManager, but the pod may still be terminating and + // tracked by the podWorkers. Components that need to know the actual consumed resources of the + // node or are driven by podWorkers and the sync*Pod methods (status, volume, stats) should also + // consult the podWorkers when reconciling. + // + // TODO: review all kubelet components that need the actual set of pods (vs the desired set) + // and update them to use podWorkers instead of podManager. This may introduce latency in some + // methods, but avoids race conditions and correctly accounts for terminating pods that have + // been force deleted or static pods that have been updated. + // https://github.com/kubernetes/kubernetes/issues/116970 + podManager kubepod.Manager + + // podWorkers is responsible for driving the lifecycle state machine of each pod. The worker is + // notified of config changes, updates, periodic reconciliation, container runtime updates, and + // evictions of all desired pods and will invoke reconciliation methods per pod in separate + // goroutines. The podWorkers are authoritative in the kubelet for what pods are actually being + // run and their current state: + // + // * syncing: pod should be running (syncPod) + // * terminating: pod should be stopped (syncTerminatingPod) + // * terminated: pod should have all resources cleaned up (syncTerminatedPod) + // + // and invoke the handler methods that correspond to each state. Components within the + // kubelet that need to know the phase of the pod in order to correctly set up or tear down + // resources must consult the podWorkers. + // + // Once a pod has been accepted by the pod workers, no other pod with that same UID (and + // name+namespace, for static pods) will be started until the first pod has fully terminated + // and been cleaned up by SyncKnownPods. This means a pod may be desired (in API), admitted + // (in pod manager), and requested (by invoking UpdatePod) but not start for an arbitrarily + // long interval because a prior pod is still terminating. + // + // As an event-driven (by UpdatePod) controller, the podWorkers must periodically be resynced + // by the kubelet invoking SyncKnownPods with the desired state (admitted pods in podManager). + // Since the podManager may be unaware of some running pods due to force deletion, the + // podWorkers are responsible for triggering a sync of pods that are no longer desired but + // must still run to completion. podWorkers PodWorkers + // evictionManager observes the state of the node for situations that could impact node stability + // and evicts pods (sets to phase Failed with reason Evicted) to reduce resource pressure. The + // eviction manager acts on the actual state of the node and considers the podWorker to be + // authoritative. + evictionManager eviction.Manager + + // probeManager tracks the set of running pods and ensures any user-defined periodic checks are + // run to introspect the state of each pod. The probe manager acts on the actual state of the node + // and is notified of pods by the podWorker. The probe manager is the authoritative source of the + // most recent probe status and is responsible for notifying the status manager, which + // synthesizes them into the overall pod status. + probeManager prober.Manager + + // secretManager caches the set of secrets used by running pods on this node. The podWorkers + // notify the secretManager when pods are started and terminated, and the secretManager must + // then keep the needed secrets up-to-date as they change. + secretManager secret.Manager + + // configMapManager caches the set of config maps used by running pods on this node. The + // podWorkers notify the configMapManager when pods are started and terminated, and the + // configMapManager must then keep the needed config maps up-to-date as they change. + configMapManager configmap.Manager + + // volumeManager observes the set of running pods and is responsible for attaching, mounting, + // unmounting, and detaching as those pods move through their lifecycle. It periodically + // synchronizes the set of known volumes to the set of actually desired volumes and cleans up + // any orphaned volumes. The volume manager considers the podWorker to be authoritative for + // which pods are running. + volumeManager volumemanager.VolumeManager + + // statusManager receives updated pod status updates from the podWorker and updates the API + // status of those pods to match. The statusManager is authoritative for the synthesized + // status of the pod from the kubelet's perspective (other components own the individual + // elements of status) and should be consulted by components in preference to assembling + // that status themselves. Note that the status manager is downstream of the pod worker + // and components that need to check whether a pod is still running should instead directly + // consult the pod worker. + statusManager status.Manager + // resyncInterval is the interval between periodic full reconciliations of // pods on this node. resyncInterval time.Duration @@ -953,16 +1038,6 @@ type Kubelet struct { // sourcesReady records the sources seen by the kubelet, it is thread-safe. sourcesReady config.SourcesReady - // podManager is a facade that abstracts away the various sources of pods - // this Kubelet services. - podManager kubepod.Manager - // mirrorPodClient is used to create and delete mirror pods in the API for static - // pods. - mirrorPodClient kubepod.MirrorClient - - // Needed to observe and respond to situations that could impact node stability - evictionManager eviction.Manager - // Optional, defaults to /logs/ from /var/log logServer http.Handler // Optional, defaults to simple Docker implementation @@ -1003,8 +1078,6 @@ type Kubelet struct { // Volume plugins. volumePluginMgr *volume.VolumePluginMgr - // Handles container probing. - probeManager prober.Manager // Manages container health check results. livenessManager proberesults.Manager readinessManager proberesults.Manager @@ -1026,12 +1099,6 @@ type Kubelet struct { // Manager for container logs. containerLogManager logs.ContainerLogManager - // Secret manager. - secretManager secret.Manager - - // ConfigMap manager. - configMapManager configmap.Manager - // Cached MachineInfo returned by cadvisor. machineInfoLock sync.RWMutex machineInfo *cadvisorapi.MachineInfo @@ -1039,14 +1106,6 @@ type Kubelet struct { // Handles certificate rotations. serverCertificateManager certificate.Manager - // Syncs pods statuses with apiserver; also used as a cache of statuses. - statusManager status.Manager - - // VolumeManager runs a set of asynchronous loops that figure out which - // volumes need to be attached/mounted/unmounted/detached based on the pods - // scheduled on this node and makes it so. - volumeManager volumemanager.VolumeManager - // Cloud provider interface. cloud cloudprovider.Interface // Handles requests to cloud provider with timeout @@ -1112,10 +1171,12 @@ type Kubelet struct { // nodeLeaseController claims and renews the node lease for this Kubelet nodeLeaseController lease.Controller - // Generates pod events. + // pleg observes the state of the container runtime and notifies the kubelet of changes to containers, which + // notifies the podWorkers to reconcile the state of the pod (for instance, if a container dies and needs to + // be restarted). pleg pleg.PodLifecycleEventGenerator - // Evented PLEG + // eventedPleg supplements the pleg to deliver edge-driven container changes with low-latency. eventedPleg pleg.PodLifecycleEventGenerator // Store kubecontainer.PodStatus for all pods. @@ -2136,6 +2197,15 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus // Get pods which should be resynchronized. Currently, the following pod should be resynchronized: // - pod whose work is ready. // - internal modules that request sync of a pod. +// +// This method does not return orphaned pods (those known only to the pod worker that may have +// been deleted from configuration). Those pods are synced by HandlePodCleanups as a consequence +// of driving the state machine to completion. +// +// TODO: Consider synchronizing all pods which have not recently been acted on to be resilient +// to bugs that might prevent updates from being delivered (such as the previous bug with +// orphaned pods). Instead of asking the work queue for pending work, consider asking the +// PodWorker which pods should be synced. func (kl *Kubelet) getPodsToSync() []*v1.Pod { allPods := kl.podManager.GetPods() podUIDs := kl.workQueue.GetWork() diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index ec47750c229..4212f02e6be 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -43,8 +43,6 @@ import ( // pod. When a static pod gets deleted, the associated orphaned mirror pod // will also be removed. type Manager interface { - // GetPods returns the regular pods bound to the kubelet and their spec. - GetPods() []*v1.Pod // GetPodByFullName returns the (non-mirror) pod that matches full name, as well as // whether the pod was found. GetPodByFullName(podFullName string) (*v1.Pod, bool) @@ -64,9 +62,14 @@ type Manager interface { // and a mirror pod can be found, return it. If a mirror pod is provided and // the pod can be found, return it and true for wasMirror. GetPodAndMirrorPod(*v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) + + // GetPods returns the regular pods bound to the kubelet and their spec. + GetPods() []*v1.Pod + // GetPodsAndMirrorPods returns the set of pods, the set of mirror pods, and // the pod fullnames of any orphaned mirror pods. GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) + // SetPods replaces the internal pods with the new pods. // It is currently only used for testing. SetPods(pods []*v1.Pod) @@ -78,6 +81,7 @@ type Manager interface { // this means deleting the mappings related to mirror pods. For non- // mirror pods, this means deleting from indexes for all non-mirror pods. DeletePod(pod *v1.Pod) + // TranslatePodUID returns the actual UID of a pod. If the UID belongs to // a mirror pod, returns the UID of its static pod. Otherwise, returns the // original UID. From 166256f73e8add34dcd7fb703de5ca24d75b1d4b Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 21:28:37 -0500 Subject: [PATCH 7/9] kubelet: Reduce the interface pod.Manager consumers accept Every component that uses a pod.Manager should use a stub interface (like we do for podWorker) that explicitly describes what methods they use. This will allow podWorker to implement the minimum set of manager interfaces. --- pkg/kubelet/stats/provider.go | 16 ++-- pkg/kubelet/status/status_manager.go | 19 +++-- pkg/kubelet/status/status_manager_test.go | 44 +++++----- .../testing/mock_pod_status_provider.go | 83 +++++++++++++++++++ .../desired_state_of_world_populator.go | 20 +++-- .../desired_state_of_world_populator_test.go | 15 ++-- pkg/kubelet/volumemanager/volume_manager.go | 14 +++- 7 files changed, 166 insertions(+), 45 deletions(-) diff --git a/pkg/kubelet/stats/provider.go b/pkg/kubelet/stats/provider.go index 1241111236a..09f82c609f5 100644 --- a/pkg/kubelet/stats/provider.go +++ b/pkg/kubelet/stats/provider.go @@ -27,18 +27,24 @@ import ( statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit" "k8s.io/kubernetes/pkg/kubelet/status" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID +} + // NewCRIStatsProvider returns a Provider that provides the node stats // from cAdvisor and the container stats from CRI. func NewCRIStatsProvider( cadvisor cadvisor.Interface, resourceAnalyzer stats.ResourceAnalyzer, - podManager kubepod.Manager, + podManager PodManager, runtimeCache kubecontainer.RuntimeCache, runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, @@ -54,7 +60,7 @@ func NewCRIStatsProvider( func NewCadvisorStatsProvider( cadvisor cadvisor.Interface, resourceAnalyzer stats.ResourceAnalyzer, - podManager kubepod.Manager, + podManager PodManager, runtimeCache kubecontainer.RuntimeCache, imageService kubecontainer.ImageService, statusProvider status.PodStatusProvider, @@ -67,7 +73,7 @@ func NewCadvisorStatsProvider( // cAdvisor and the container stats using the containerStatsProvider. func newStatsProvider( cadvisor cadvisor.Interface, - podManager kubepod.Manager, + podManager PodManager, runtimeCache kubecontainer.RuntimeCache, containerStatsProvider containerStatsProvider, ) *Provider { @@ -82,7 +88,7 @@ func newStatsProvider( // Provider provides the stats of the node and the pod-managed containers. type Provider struct { cadvisor cadvisor.Interface - podManager kubepod.Manager + podManager PodManager runtimeCache kubecontainer.RuntimeCache containerStatsProvider } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 52c93d3db66..70e2dc03812 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -40,7 +40,6 @@ import ( "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" - kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/status/state" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" statusutil "k8s.io/kubernetes/pkg/util/pod" @@ -70,7 +69,7 @@ type versionedPodStatus struct { // All methods are thread-safe. type manager struct { kubeClient clientset.Interface - podManager kubepod.Manager + podManager PodManager // Map from pod UID to sync status of the corresponding pod. podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex @@ -87,8 +86,18 @@ type manager struct { stateFileDirectory string } -// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components -// that need to introspect status. +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + GetPodByUID(types.UID) (*v1.Pod, bool) + GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool) + TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID + GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) +} + +// PodStatusProvider knows how to provide status for a pod. It is intended to be used by other components +// that need to introspect the authoritative status of a pod. The PodStatusProvider represents the actual +// status of a running pod as the kubelet sees it. type PodStatusProvider interface { // GetPodStatus returns the cached status for the provided pod UID, as well as whether it // was a cache hit. @@ -149,7 +158,7 @@ type Manager interface { const syncPeriod = 10 * time.Second // NewManager returns a functional Manager. -func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager { +func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager { return &manager{ kubeClient: kubeClient, podManager: podManager, diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index dd4e03867e8..cad6bd7c4fb 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -50,6 +50,12 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util" ) +type mutablePodManager interface { + AddPod(*v1.Pod) + UpdatePod(*v1.Pod) + DeletePod(*v1.Pod) +} + // Generate new instance of test pod with the same initial value. func getTestPod() *v1.Pod { return &v1.Pod{ @@ -85,7 +91,7 @@ func (m *manager) testSyncBatch() { func newTestManager(kubeClient clientset.Interface) *manager { podManager := kubepod.NewBasicPodManager() - podManager.AddPod(getTestPod()) + podManager.(mutablePodManager).AddPod(getTestPod()) podStartupLatencyTracker := util.NewPodStartupLatencyTracker() testRootDir := "" if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil { @@ -328,10 +334,10 @@ func TestSyncPodChecksMismatchedUID(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) pod := getTestPod() pod.UID = "first" - syncer.podManager.AddPod(pod) + syncer.podManager.(mutablePodManager).AddPod(pod) differentPod := getTestPod() differentPod.UID = "second" - syncer.podManager.AddPod(differentPod) + syncer.podManager.(mutablePodManager).AddPod(differentPod) syncer.kubeClient = fake.NewSimpleClientset(pod) syncer.SetPodStatus(differentPod, getRandomPodStatus()) verifyActions(t, syncer, []core.Action{getAction()}) @@ -531,7 +537,7 @@ func TestStaticPod(t *testing.T) { m := newTestManager(client) t.Logf("Create the static pod") - m.podManager.AddPod(staticPod) + m.podManager.(mutablePodManager).AddPod(staticPod) assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod") status := getRandomPodStatus() @@ -549,7 +555,7 @@ func TestStaticPod(t *testing.T) { assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions()) t.Logf("Create the mirror pod") - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID)) @@ -566,10 +572,10 @@ func TestStaticPod(t *testing.T) { verifyActions(t, m, []core.Action{}) t.Logf("Change mirror pod identity.") - m.podManager.DeletePod(mirrorPod) + m.podManager.(mutablePodManager).DeletePod(mirrorPod) mirrorPod.UID = "new-mirror-pod" mirrorPod.Status = v1.PodStatus{} - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) t.Logf("Should not update to mirror pod, because UID has changed.") assert.Equal(t, m.syncBatch(true), 1) @@ -1067,7 +1073,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) @@ -1131,7 +1137,7 @@ func TestSetContainerReadiness(t *testing.T) { m := newTestManager(&fake.Clientset{}) // Add test pod because the container spec has been changed. - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) t.Log("Setting readiness before status should fail.") m.SetContainerReadiness(pod.UID, cID1, true) @@ -1215,7 +1221,7 @@ func TestSetContainerStartup(t *testing.T) { m := newTestManager(&fake.Clientset{}) // Add test pod because the container spec has been changed. - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) t.Log("Setting startup before status should fail.") m.SetContainerStartup(pod.UID, cID1, true) @@ -1279,11 +1285,11 @@ func TestSyncBatchCleanupVersions(t *testing.T) { t.Logf("Non-orphaned pods should not be removed.") m.SetPodStatus(testPod, getRandomPodStatus()) - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) staticPod := mirrorPod staticPod.UID = "static-uid" staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"} - m.podManager.AddPod(staticPod) + m.podManager.(mutablePodManager).AddPod(staticPod) m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 m.testSyncBatch() @@ -1311,7 +1317,7 @@ func TestReconcilePodStatus(t *testing.T) { testPod.Status = podStatus t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing") - syncer.podManager.UpdatePod(testPod) + syncer.podManager.(mutablePodManager).UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { t.Fatalf("Pod status is the same, a reconciliation is not needed") } @@ -1326,7 +1332,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Logf("Syncbatch should do nothing, as a reconciliation is not required") normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy() testPod.Status.StartTime = &normalizedStartTime - syncer.podManager.UpdatePod(testPod) + syncer.podManager.(mutablePodManager).UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") } @@ -1336,7 +1342,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") changedPodStatus := getRandomPodStatus() - syncer.podManager.UpdatePod(testPod) + syncer.podManager.(mutablePodManager).UpdatePod(testPod) if !syncer.needsReconcile(testPod.UID, changedPodStatus) { t.Fatalf("Pod status is different, a reconciliation is needed") } @@ -1359,7 +1365,7 @@ func TestDeletePodBeforeFinished(t *testing.T) { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) status := getRandomPodStatus() status.Phase = v1.PodFailed m.SetPodStatus(pod, status) @@ -1373,7 +1379,7 @@ func TestDeletePodFinished(t *testing.T) { pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(pod) m := newTestManager(client) - m.podManager.AddPod(pod) + m.podManager.(mutablePodManager).AddPod(pod) status := getRandomPodStatus() status.Phase = v1.PodFailed m.TerminatePod(pod) @@ -1394,8 +1400,8 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()} client := fake.NewSimpleClientset(mirrorPod) m := newTestManager(client) - m.podManager.AddPod(staticPod) - m.podManager.AddPod(mirrorPod) + m.podManager.(mutablePodManager).AddPod(staticPod) + m.podManager.(mutablePodManager).AddPod(mirrorPod) t.Logf("Verify setup.") assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod") assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") diff --git a/pkg/kubelet/status/testing/mock_pod_status_provider.go b/pkg/kubelet/status/testing/mock_pod_status_provider.go index d88464f9746..fecd2e7f307 100644 --- a/pkg/kubelet/status/testing/mock_pod_status_provider.go +++ b/pkg/kubelet/status/testing/mock_pod_status_provider.go @@ -27,8 +27,91 @@ import ( v1 "k8s.io/api/core/v1" types "k8s.io/apimachinery/pkg/types" container "k8s.io/kubernetes/pkg/kubelet/container" + types0 "k8s.io/kubernetes/pkg/kubelet/types" ) +// MockPodManager is a mock of PodManager interface. +type MockPodManager struct { + ctrl *gomock.Controller + recorder *MockPodManagerMockRecorder +} + +// MockPodManagerMockRecorder is the mock recorder for MockPodManager. +type MockPodManagerMockRecorder struct { + mock *MockPodManager +} + +// NewMockPodManager creates a new mock instance. +func NewMockPodManager(ctrl *gomock.Controller) *MockPodManager { + mock := &MockPodManager{ctrl: ctrl} + mock.recorder = &MockPodManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPodManager) EXPECT() *MockPodManagerMockRecorder { + return m.recorder +} + +// GetMirrorPodByPod mocks base method. +func (m *MockPodManager) GetMirrorPodByPod(arg0 *v1.Pod) (*v1.Pod, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMirrorPodByPod", arg0) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetMirrorPodByPod indicates an expected call of GetMirrorPodByPod. +func (mr *MockPodManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockPodManager)(nil).GetMirrorPodByPod), arg0) +} + +// GetPodByUID mocks base method. +func (m *MockPodManager) GetPodByUID(arg0 types.UID) (*v1.Pod, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPodByUID", arg0) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// GetPodByUID indicates an expected call of GetPodByUID. +func (mr *MockPodManagerMockRecorder) GetPodByUID(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByUID", reflect.TypeOf((*MockPodManager)(nil).GetPodByUID), arg0) +} + +// GetUIDTranslations mocks base method. +func (m *MockPodManager) GetUIDTranslations() (map[types0.ResolvedPodUID]types0.MirrorPodUID, map[types0.MirrorPodUID]types0.ResolvedPodUID) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUIDTranslations") + ret0, _ := ret[0].(map[types0.ResolvedPodUID]types0.MirrorPodUID) + ret1, _ := ret[1].(map[types0.MirrorPodUID]types0.ResolvedPodUID) + return ret0, ret1 +} + +// GetUIDTranslations indicates an expected call of GetUIDTranslations. +func (mr *MockPodManagerMockRecorder) GetUIDTranslations() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockPodManager)(nil).GetUIDTranslations)) +} + +// TranslatePodUID mocks base method. +func (m *MockPodManager) TranslatePodUID(uid types.UID) types0.ResolvedPodUID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TranslatePodUID", uid) + ret0, _ := ret[0].(types0.ResolvedPodUID) + return ret0 +} + +// TranslatePodUID indicates an expected call of TranslatePodUID. +func (mr *MockPodManagerMockRecorder) TranslatePodUID(uid interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TranslatePodUID", reflect.TypeOf((*MockPodManager)(nil).TranslatePodUID), uid) +} + // MockPodStatusProvider is a mock of PodStatusProvider interface. type MockPodStatusProvider struct { ctrl *gomock.Controller diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index d66acf63af3..8aab267d2c1 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -40,7 +40,6 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csimigration" @@ -70,12 +69,19 @@ type DesiredStateOfWorldPopulator interface { HasAddedPods() bool } -// podStateProvider can determine if a pod is going to be terminated. -type podStateProvider interface { +// PodStateProvider can determine if a pod is going to be terminated. +type PodStateProvider interface { ShouldPodContainersBeTerminating(types.UID) bool ShouldPodRuntimeBeRemoved(types.UID) bool } +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + GetPodByUID(types.UID) (*v1.Pod, bool) + GetPods() []*v1.Pod +} + // NewDesiredStateOfWorldPopulator returns a new instance of // DesiredStateOfWorldPopulator. // @@ -90,8 +96,8 @@ type podStateProvider interface { func NewDesiredStateOfWorldPopulator( kubeClient clientset.Interface, loopSleepDuration time.Duration, - podManager pod.Manager, - podStateProvider podStateProvider, + podManager PodManager, + podStateProvider PodStateProvider, desiredStateOfWorld cache.DesiredStateOfWorld, actualStateOfWorld cache.ActualStateOfWorld, kubeContainerRuntime kubecontainer.Runtime, @@ -121,8 +127,8 @@ func NewDesiredStateOfWorldPopulator( type desiredStateOfWorldPopulator struct { kubeClient clientset.Interface loopSleepDuration time.Duration - podManager pod.Manager - podStateProvider podStateProvider + podManager PodManager + podStateProvider PodStateProvider desiredStateOfWorld cache.DesiredStateOfWorld actualStateOfWorld cache.ActualStateOfWorld pods processedPods diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 077ed1eca18..d87bad0bafc 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -323,17 +323,22 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) { } } +type mutablePodManager interface { + GetPodByName(string, string) (*v1.Pod, bool) + DeletePod(*v1.Pod) +} + func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t) podName := util.GetUniquePodName(pod) //let the pod be terminated - podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name) if !exist { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - dswp.podManager.DeletePod(pod) + dswp.podManager.(mutablePodManager).DeletePod(pod) dswp.findAndRemoveDeletedPods() @@ -389,7 +394,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) { podName := util.GetUniquePodName(pod) //let the pod be terminated - podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name) if !exist { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } @@ -451,12 +456,12 @@ func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) { podName := util.GetUniquePodName(pod) //let the pod be terminated - podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name) + podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name) if !exist { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - dswp.podManager.DeletePod(pod) + dswp.podManager.(mutablePodManager).DeletePod(pod) fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}} // Add the volume to ASW by reconciling. diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 915685bd67d..7e5d00c6d12 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -39,7 +39,6 @@ import ( csitrans "k8s.io/csi-translation-lib" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics" "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator" @@ -151,11 +150,18 @@ type VolumeManager interface { } // podStateProvider can determine if a pod is going to be terminated -type podStateProvider interface { +type PodStateProvider interface { ShouldPodContainersBeTerminating(k8stypes.UID) bool ShouldPodRuntimeBeRemoved(k8stypes.UID) bool } +// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. +// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc. +type PodManager interface { + GetPodByUID(k8stypes.UID) (*v1.Pod, bool) + GetPods() []*v1.Pod +} + // NewVolumeManager returns a new concrete instance implementing the // VolumeManager interface. // @@ -167,8 +173,8 @@ type podStateProvider interface { func NewVolumeManager( controllerAttachDetachEnabled bool, nodeName k8stypes.NodeName, - podManager pod.Manager, - podStateProvider podStateProvider, + podManager PodManager, + podStateProvider PodStateProvider, kubeClient clientset.Interface, volumePluginMgr *volume.VolumePluginMgr, kubeContainerRuntime container.Runtime, From 1f16d711857a3115c65946510eeca6e2d1778659 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 14 Apr 2023 15:49:32 -0400 Subject: [PATCH 8/9] kubelet: Rename PodManager DeletePod to RemovePod RemovePod is more consistent within the kubelet to be the opposite of AddPod, and the pod is not being deleted just "removed" from tracking. --- pkg/kubelet/kubelet.go | 2 +- pkg/kubelet/kubelet_test.go | 6 ++--- pkg/kubelet/pod/pod_manager.go | 8 +++---- pkg/kubelet/pod/pod_manager_test.go | 6 ++--- pkg/kubelet/pod/testing/mock_manager.go | 24 +++++++++---------- pkg/kubelet/status/status_manager_test.go | 4 ++-- .../desired_state_of_world_populator_test.go | 13 +++++----- 7 files changed, 32 insertions(+), 31 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 82ce5706174..709e35015fb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2624,7 +2624,7 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { - kl.podManager.DeletePod(pod) + kl.podManager.RemovePod(pod) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 123b57e336b..5b62ca1289c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -2530,9 +2530,9 @@ func TestHandlePodResourcesResize(t *testing.T) { testPod2.UID: true, testPod3.UID: true, } - defer kubelet.podManager.DeletePod(testPod3) - defer kubelet.podManager.DeletePod(testPod2) - defer kubelet.podManager.DeletePod(testPod1) + defer kubelet.podManager.RemovePod(testPod3) + defer kubelet.podManager.RemovePod(testPod2) + defer kubelet.podManager.RemovePod(testPod1) tests := []struct { name string diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 4212f02e6be..e3cc4f76080 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -77,10 +77,10 @@ type Manager interface { AddPod(pod *v1.Pod) // UpdatePod updates the given pod in the manager. UpdatePod(pod *v1.Pod) - // DeletePod deletes the given pod from the manager. For mirror pods, + // RemovePod deletes the given pod from the manager. For mirror pods, // this means deleting the mappings related to mirror pods. For non- // mirror pods, this means deleting from indexes for all non-mirror pods. - DeletePod(pod *v1.Pod) + RemovePod(pod *v1.Pod) // TranslatePodUID returns the actual UID of a pod. If the UID belongs to // a mirror pod, returns the UID of its static pod. Otherwise, returns the @@ -98,7 +98,7 @@ type Manager interface { // basicManager is a functional Manager. // // All fields in basicManager are read-only and are updated calling SetPods, -// AddPod, UpdatePod, or DeletePod. +// AddPod, UpdatePod, or RemovePod. type basicManager struct { // Protects all internal maps. lock sync.RWMutex @@ -189,7 +189,7 @@ func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { } } -func (pm *basicManager) DeletePod(pod *v1.Pod) { +func (pm *basicManager) RemovePod(pod *v1.Pod) { updateMetrics(pod, nil) pm.lock.Lock() defer pm.lock.Unlock() diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 3571604118c..8c688ed836c 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -111,7 +111,7 @@ func TestGetSetPods(t *testing.T) { } -func TestDeletePods(t *testing.T) { +func TestRemovePods(t *testing.T) { mirrorPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: types.UID("mirror-pod-uid"), @@ -147,11 +147,11 @@ func TestDeletePods(t *testing.T) { podManager, _ := newTestManager() podManager.SetPods(updates) - podManager.DeletePod(staticPod) + podManager.RemovePod(staticPod) actualPods := podManager.GetPods() if len(actualPods) == len(expectedPods) { - t.Fatalf("Run DeletePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods)) + t.Fatalf("Run RemovePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods)) } _, _, orphanedMirrorPodNames := podManager.GetPodsAndMirrorPods() diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 2d68384b410..643705192dd 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -64,18 +64,6 @@ func (mr *MockManagerMockRecorder) AddPod(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPod", reflect.TypeOf((*MockManager)(nil).AddPod), arg0) } -// DeletePod mocks base method. -func (m *MockManager) DeletePod(arg0 *v1.Pod) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "DeletePod", arg0) -} - -// DeletePod indicates an expected call of DeletePod. -func (mr *MockManagerMockRecorder) DeletePod(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePod", reflect.TypeOf((*MockManager)(nil).DeletePod), arg0) -} - // GetMirrorPodByPod mocks base method. func (m *MockManager) GetMirrorPodByPod(arg0 *v1.Pod) (*v1.Pod, bool) { m.ctrl.T.Helper() @@ -212,6 +200,18 @@ func (mr *MockManagerMockRecorder) GetUIDTranslations() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockManager)(nil).GetUIDTranslations)) } +// RemovePod mocks base method. +func (m *MockManager) RemovePod(arg0 *v1.Pod) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RemovePod", arg0) +} + +// RemovePod indicates an expected call of RemovePod. +func (mr *MockManagerMockRecorder) RemovePod(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemovePod", reflect.TypeOf((*MockManager)(nil).RemovePod), arg0) +} + // SetPods mocks base method. func (m *MockManager) SetPods(arg0 []*v1.Pod) { m.ctrl.T.Helper() diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index cad6bd7c4fb..437b45b26a5 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -53,7 +53,7 @@ import ( type mutablePodManager interface { AddPod(*v1.Pod) UpdatePod(*v1.Pod) - DeletePod(*v1.Pod) + RemovePod(*v1.Pod) } // Generate new instance of test pod with the same initial value. @@ -572,7 +572,7 @@ func TestStaticPod(t *testing.T) { verifyActions(t, m, []core.Action{}) t.Logf("Change mirror pod identity.") - m.podManager.(mutablePodManager).DeletePod(mirrorPod) + m.podManager.(mutablePodManager).RemovePod(mirrorPod) mirrorPod.UID = "new-mirror-pod" mirrorPod.Status = v1.PodStatus{} m.podManager.(mutablePodManager).AddPod(mirrorPod) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index d87bad0bafc..2b4d8f812d6 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -17,10 +17,11 @@ limitations under the License. package populator import ( - "k8s.io/klog/v2/ktesting" "testing" "time" + "k8s.io/klog/v2/ktesting" + "fmt" "github.com/stretchr/testify/require" @@ -287,7 +288,7 @@ func TestFindAndAddNewPods_WithReprocessPodAndVolumeRetrievalError(t *testing.T) if !dswp.podPreviouslyProcessed(podName) { t.Fatalf("Failed to record that the volumes for the specified pod: %s have been processed by the populator", podName) } - fakePodManager.DeletePod(pod) + fakePodManager.RemovePod(pod) } func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) { @@ -325,7 +326,7 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) { type mutablePodManager interface { GetPodByName(string, string) (*v1.Pod, bool) - DeletePod(*v1.Pod) + RemovePod(*v1.Pod) } func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { @@ -338,7 +339,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - dswp.podManager.(mutablePodManager).DeletePod(pod) + dswp.podManager.(mutablePodManager).RemovePod(pod) dswp.findAndRemoveDeletedPods() @@ -461,7 +462,7 @@ func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) { t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - dswp.podManager.(mutablePodManager).DeletePod(pod) + dswp.podManager.(mutablePodManager).RemovePod(pod) fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}} // Add the volume to ASW by reconciling. @@ -758,7 +759,7 @@ func TestFindAndAddNewPods_FindAndRemoveDeletedPods_Valid_Block_VolumeDevices(t t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace) } podGet.Status.Phase = v1.PodFailed - fakePodManager.DeletePod(pod) + fakePodManager.RemovePod(pod) fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}} //pod is added to fakePodManager but pod state knows the pod is removed, so here findAndRemoveDeletedPods() will remove the pod and volumes it is mounted From 6ac1bae28154b6f81023d6e783d91728687f7aff Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 12 May 2023 12:48:32 -0400 Subject: [PATCH 9/9] test: Improve debug output of init container tests When certain status conditions are not expected, we need to see the nested objects, but %#v doesn't handle pointers well. Output as simple encoded JSON. --- test/e2e/common/node/init_container.go | 31 +++++++++++++++++--------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/test/e2e/common/node/init_container.go b/test/e2e/common/node/init_container.go index a87230e8436..11ebc72a8ef 100644 --- a/test/e2e/common/node/init_container.go +++ b/test/e2e/common/node/init_container.go @@ -18,6 +18,7 @@ package node import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -394,10 +395,10 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { case *v1.Pod: for _, status := range t.Status.ContainerStatuses { if status.State.Waiting == nil { - return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status) + return false, fmt.Errorf("container %q should not be out of waiting: %s", status.Name, toDebugJSON(status)) } if status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status) + return false, fmt.Errorf("container %q should have reason PodInitializing: %s", status.Name, toDebugJSON(status)) } } if len(t.Status.InitContainerStatuses) != 2 { @@ -405,14 +406,14 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { } status := t.Status.InitContainerStatuses[1] if status.State.Waiting == nil { - return false, fmt.Errorf("second init container should not be out of waiting: %#v", status) + return false, fmt.Errorf("second init container should not be out of waiting: %s", toDebugJSON(status)) } if status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status) + return false, fmt.Errorf("second init container should have reason PodInitializing: %s", toDebugJSON(status)) } status = t.Status.InitContainerStatuses[0] if status.State.Terminated != nil && status.State.Terminated.ExitCode == 0 { - return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status) + return false, fmt.Errorf("first init container should have exitCode != 0: %s", toDebugJSON(status)) } // continue until we see an attempt to restart the pod return status.LastTerminationState.Terminated != nil, nil @@ -518,10 +519,10 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { case *v1.Pod: for _, status := range t.Status.ContainerStatuses { if status.State.Waiting == nil { - return false, fmt.Errorf("container %q should not be out of waiting: %#v", status.Name, status) + return false, fmt.Errorf("container %q should not be out of waiting: %s", status.Name, toDebugJSON(status)) } if status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("container %q should have reason PodInitializing: %#v", status.Name, status) + return false, fmt.Errorf("container %q should have reason PodInitializing: %s", status.Name, toDebugJSON(status)) } } if len(t.Status.InitContainerStatuses) != 2 { @@ -530,19 +531,19 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { status := t.Status.InitContainerStatuses[0] if status.State.Terminated == nil { if status.State.Waiting != nil && status.State.Waiting.Reason != "PodInitializing" { - return false, fmt.Errorf("second init container should have reason PodInitializing: %#v", status) + return false, fmt.Errorf("second init container should have reason PodInitializing: %s", toDebugJSON(status)) } return false, nil } if status.State.Terminated != nil && status.State.Terminated.ExitCode != 0 { - return false, fmt.Errorf("first init container should have exitCode != 0: %#v", status) + return false, fmt.Errorf("first init container should have exitCode != 0: %s", toDebugJSON(status)) } status = t.Status.InitContainerStatuses[1] if status.State.Terminated == nil { return false, nil } if status.State.Terminated.ExitCode == 0 { - return false, fmt.Errorf("second init container should have failed: %#v", status) + return false, fmt.Errorf("second init container should have failed: %s", toDebugJSON(status)) } return true, nil default: @@ -566,3 +567,13 @@ var _ = SIGDescribe("InitContainer [NodeConformance]", func() { gomega.Expect(endPod.Status.ContainerStatuses[0].State.Waiting).ToNot(gomega.BeNil()) }) }) + +// toDebugJSON converts an object to its JSON representation for debug logging +// purposes instead of using a struct. +func toDebugJSON(obj interface{}) string { + m, err := json.Marshal(obj) + if err != nil { + return fmt.Sprintf("", err) + } + return string(m) +}