From bb568844b67edf5624cdd8d9a9db5d34e2cc5d79 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 23 Jan 2023 20:54:04 -0500 Subject: [PATCH] kubelet: Separate the MirrorClient from the PodManager The two are not coupled except accidentally. Separate them and update callsites. This will reduce the scope of PodManager interface to make exposing the pod worker cleaner. --- pkg/kubelet/kubelet.go | 12 ++++---- pkg/kubelet/kubelet_pods.go | 2 +- pkg/kubelet/kubelet_test.go | 3 +- pkg/kubelet/pod/pod_manager.go | 8 +---- pkg/kubelet/pod/pod_manager_test.go | 2 +- pkg/kubelet/pod/testing/mock_manager.go | 29 ------------------- pkg/kubelet/prober/common_test.go | 2 +- pkg/kubelet/prober/scale_test.go | 2 +- pkg/kubelet/prober/worker_test.go | 2 +- pkg/kubelet/runonce.go | 2 +- pkg/kubelet/runonce_test.go | 4 +-- pkg/kubelet/status/status_manager_test.go | 5 ++-- .../desired_state_of_world_populator_test.go | 4 +-- .../volumemanager/volume_manager_test.go | 9 +++--- 14 files changed, 25 insertions(+), 61 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ed3a44b7d4a..4274bb85aed 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -611,9 +611,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.startupManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() - // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. - mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) - klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient) + klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) + klet.podManager = kubepod.NewBasicPodManager() klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir()) @@ -957,6 +956,9 @@ type Kubelet struct { // podManager is a facade that abstracts away the various sources of pods // this Kubelet services. podManager kubepod.Manager + // mirrorPodClient is used to create and delete mirror pods in the API for static + // pods. + mirrorPodClient kubepod.MirrorClient // Needed to observe and respond to situations that could impact node stability evictionManager eviction.Manager @@ -1829,7 +1831,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID) podFullName := kubecontainer.GetPodFullName(pod) var err error - deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) + deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) if deleted { klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod)) } else if err != nil { @@ -1843,7 +1845,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) } else { klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.podManager.CreateMirrorPod(pod); err != nil { + if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod)) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index e87da0a13fb..dcdc1e4cc01 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1108,7 +1108,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { klog.V(3).InfoS("Clean up orphaned mirror pods") for _, podFullname := range orphanedMirrorPodFullnames { if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) { - _, err := kl.podManager.DeleteMirrorPod(podFullname, nil) + _, err := kl.mirrorPodClient.DeleteMirrorPod(podFullname, nil) if err != nil { klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname) } else { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 6608a447c7e..123b57e336b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -260,7 +260,8 @@ func newTestKubeletWithImageList( kubelet.secretManager = secretManager configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient) kubelet.configMapManager = configMapManager - kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient) + kubelet.mirrorPodClient = fakeMirrorClient + kubelet.podManager = kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir()) diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 5426ffc34ad..ec47750c229 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -89,8 +89,6 @@ type Manager interface { // GetUIDTranslations returns the mappings of static pod UIDs to mirror pod // UIDs and mirror pod UIDs to static pod UIDs. GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID) - - MirrorClient } // basicManager is a functional Manager. @@ -112,15 +110,11 @@ type basicManager struct { // Mirror pod UID to pod UID map. translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID - - // A mirror pod client to create/delete mirror pods. - MirrorClient } // NewBasicPodManager returns a functional Manager. -func NewBasicPodManager(client MirrorClient) Manager { +func NewBasicPodManager() Manager { pm := &basicManager{} - pm.MirrorClient = client pm.SetPods(nil) return pm } diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 67cb855d2ba..3571604118c 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -30,7 +30,7 @@ import ( // Stub out mirror client for testing purpose. func newTestManager() (*basicManager, *podtest.FakeMirrorClient) { fakeMirrorClient := podtest.NewFakeMirrorClient() - manager := NewBasicPodManager(fakeMirrorClient).(*basicManager) + manager := NewBasicPodManager().(*basicManager) return manager, fakeMirrorClient } diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 7c4afa0a958..2d68384b410 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -64,35 +64,6 @@ func (mr *MockManagerMockRecorder) AddPod(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPod", reflect.TypeOf((*MockManager)(nil).AddPod), arg0) } -// CreateMirrorPod mocks base method. -func (m *MockManager) CreateMirrorPod(arg0 *v1.Pod) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateMirrorPod", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateMirrorPod indicates an expected call of CreateMirrorPod. -func (mr *MockManagerMockRecorder) CreateMirrorPod(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateMirrorPod", reflect.TypeOf((*MockManager)(nil).CreateMirrorPod), arg0) -} - -// DeleteMirrorPod mocks base method. -func (m *MockManager) DeleteMirrorPod(arg0 string, arg1 *types.UID) (bool, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteMirrorPod", arg0, arg1) - ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DeleteMirrorPod indicates an expected call of DeleteMirrorPod. -func (mr *MockManagerMockRecorder) DeleteMirrorPod(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMirrorPod", reflect.TypeOf((*MockManager)(nil).DeleteMirrorPod), arg0, arg1) -} - // DeletePod mocks base method. func (m *MockManager) DeletePod(arg0 *v1.Pod) { m.ctrl.T.Helper() diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index ec10f49a55c..d071392a1c3 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -106,7 +106,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) { } func newTestManager() *manager { - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. podManager.AddPod(getTestPod()) diff --git a/pkg/kubelet/prober/scale_test.go b/pkg/kubelet/prober/scale_test.go index 199a52a7237..6de9687e183 100644 --- a/pkg/kubelet/prober/scale_test.go +++ b/pkg/kubelet/prober/scale_test.go @@ -87,7 +87,7 @@ func TestTCPPortExhaustion(t *testing.T) { } else { testRootDir = tempDir } - podManager := kubepod.NewBasicPodManager(nil) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() m := NewManager( status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index c95b8f5f2a0..e585e15e7d1 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -160,7 +160,7 @@ func TestDoProbe(t *testing.T) { } else { testRootDir = tempDir } - m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir) + m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir) resultsManager(m, probeType).Remove(testContainerID) } } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 8b63d368d07..b11442ae902 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -129,7 +129,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod)) klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.podManager.CreateMirrorPod(pod); err != nil { + if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod)) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index 5bb062b6561..f8d0d972585 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -72,8 +72,7 @@ func TestRunOnce(t *testing.T) { }, nil).AnyTimes() fakeSecretManager := secret.NewFakeManager() fakeConfigMapManager := configmap.NewFakeManager() - podManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() fakeRuntime := &containertest.FakeRuntime{} podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() basePath, err := utiltesting.MkTmpdir("kubelet") @@ -87,6 +86,7 @@ func TestRunOnce(t *testing.T) { cadvisor: cadvisor, nodeLister: testNodeLister{}, statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, basePath), + mirrorPodClient: podtest.NewFakeMirrorClient(), podManager: podManager, podWorkers: &fakePodWorkers{}, os: &containertest.FakeOS{}, diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index de4d19b65b4..dd4e03867e8 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -45,7 +45,6 @@ import ( "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" statustest "k8s.io/kubernetes/pkg/kubelet/status/testing" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util" @@ -85,7 +84,7 @@ func (m *manager) testSyncBatch() { } func newTestManager(kubeClient clientset.Interface) *manager { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() podManager.AddPod(getTestPod()) podStartupLatencyTracker := util.NewPodStartupLatencyTracker() testRootDir := "" @@ -981,7 +980,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() podStartupLatencyTracker := util.NewPodStartupLatencyTracker() syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 01e48b335fa..077ed1eca18 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -37,7 +37,6 @@ import ( "k8s.io/kubernetes/pkg/features" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csimigration" @@ -1613,8 +1612,7 @@ func createDswpWithVolumeWithCustomPluginMgr(t *testing.T, pv *v1.PersistentVolu return true, pv, nil }) - fakePodManager := kubepod.NewBasicPodManager( - podtest.NewFakeMirrorClient()) + fakePodManager := kubepod.NewBasicPodManager() seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr, seLinuxTranslator) diff --git a/pkg/kubelet/volumemanager/volume_manager_test.go b/pkg/kubelet/volumemanager/volume_manager_test.go index 1b5ce483ac7..cd8b591656f 100644 --- a/pkg/kubelet/volumemanager/volume_manager_test.go +++ b/pkg/kubelet/volumemanager/volume_manager_test.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" - podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" @@ -88,7 +87,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, pv, claim := createObjects(test.pvMode, test.podMode) kubeClient := fake.NewSimpleClientset(node, pod, pv, claim) @@ -144,7 +143,7 @@ func TestWaitForAttachAndMountError(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -220,7 +219,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem) claim.Status = v1.PersistentVolumeClaimStatus{ @@ -265,7 +264,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) { t.Fatalf("can't make a temp dir: %v", err) } defer os.RemoveAll(tmpDir) - podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient()) + podManager := kubepod.NewBasicPodManager() node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)