kubelet: Separate the MirrorClient from the PodManager

The two are not coupled except accidentally. Separate them and
update callsites. This will reduce the scope of PodManager interface
to make exposing the pod worker cleaner.
This commit is contained in:
Clayton Coleman 2023-01-23 20:54:04 -05:00
parent 80b1aca580
commit bb568844b6
No known key found for this signature in database
GPG Key ID: CF7DB7FC943D3E0E
14 changed files with 25 additions and 61 deletions

View File

@ -611,9 +611,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.startupManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient)
klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager()
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir())
@ -957,6 +956,9 @@ type Kubelet struct {
// podManager is a facade that abstracts away the various sources of pods
// this Kubelet services.
podManager kubepod.Manager
// mirrorPodClient is used to create and delete mirror pods in the API for static
// pods.
mirrorPodClient kubepod.MirrorClient
// Needed to observe and respond to situations that could impact node stability
evictionManager eviction.Manager
@ -1829,7 +1831,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
@ -1843,7 +1845,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}

View File

@ -1108,7 +1108,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
klog.V(3).InfoS("Clean up orphaned mirror pods")
for _, podFullname := range orphanedMirrorPodFullnames {
if !kl.podWorkers.IsPodForMirrorPodTerminatingByFullName(podFullname) {
_, err := kl.podManager.DeleteMirrorPod(podFullname, nil)
_, err := kl.mirrorPodClient.DeleteMirrorPod(podFullname, nil)
if err != nil {
klog.ErrorS(err, "Encountered error when deleting mirror pod", "podName", podFullname)
} else {

View File

@ -260,7 +260,8 @@ func newTestKubeletWithImageList(
kubelet.secretManager = secretManager
configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
kubelet.configMapManager = configMapManager
kubelet.podManager = kubepod.NewBasicPodManager(fakeMirrorClient)
kubelet.mirrorPodClient = fakeMirrorClient
kubelet.podManager = kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir())

View File

@ -89,8 +89,6 @@ type Manager interface {
// GetUIDTranslations returns the mappings of static pod UIDs to mirror pod
// UIDs and mirror pod UIDs to static pod UIDs.
GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
MirrorClient
}
// basicManager is a functional Manager.
@ -112,15 +110,11 @@ type basicManager struct {
// Mirror pod UID to pod UID map.
translationByUID map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID
// A mirror pod client to create/delete mirror pods.
MirrorClient
}
// NewBasicPodManager returns a functional Manager.
func NewBasicPodManager(client MirrorClient) Manager {
func NewBasicPodManager() Manager {
pm := &basicManager{}
pm.MirrorClient = client
pm.SetPods(nil)
return pm
}

View File

@ -30,7 +30,7 @@ import (
// Stub out mirror client for testing purpose.
func newTestManager() (*basicManager, *podtest.FakeMirrorClient) {
fakeMirrorClient := podtest.NewFakeMirrorClient()
manager := NewBasicPodManager(fakeMirrorClient).(*basicManager)
manager := NewBasicPodManager().(*basicManager)
return manager, fakeMirrorClient
}

View File

@ -64,35 +64,6 @@ func (mr *MockManagerMockRecorder) AddPod(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPod", reflect.TypeOf((*MockManager)(nil).AddPod), arg0)
}
// CreateMirrorPod mocks base method.
func (m *MockManager) CreateMirrorPod(arg0 *v1.Pod) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateMirrorPod", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// CreateMirrorPod indicates an expected call of CreateMirrorPod.
func (mr *MockManagerMockRecorder) CreateMirrorPod(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateMirrorPod", reflect.TypeOf((*MockManager)(nil).CreateMirrorPod), arg0)
}
// DeleteMirrorPod mocks base method.
func (m *MockManager) DeleteMirrorPod(arg0 string, arg1 *types.UID) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteMirrorPod", arg0, arg1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DeleteMirrorPod indicates an expected call of DeleteMirrorPod.
func (mr *MockManagerMockRecorder) DeleteMirrorPod(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteMirrorPod", reflect.TypeOf((*MockManager)(nil).DeleteMirrorPod), arg0, arg1)
}
// DeletePod mocks base method.
func (m *MockManager) DeletePod(arg0 *v1.Pod) {
m.ctrl.T.Helper()

View File

@ -106,7 +106,7 @@ func setTestProbe(pod *v1.Pod, probeType probeType, probeSpec v1.Probe) {
}
func newTestManager() *manager {
podManager := kubepod.NewBasicPodManager(nil)
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())

View File

@ -87,7 +87,7 @@ func TestTCPPortExhaustion(t *testing.T) {
} else {
testRootDir = tempDir
}
podManager := kubepod.NewBasicPodManager(nil)
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),

View File

@ -160,7 +160,7 @@ func TestDoProbe(t *testing.T) {
} else {
testRootDir = tempDir
}
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(nil), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir)
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir)
resultsManager(m, probeType).Remove(testContainerID)
}
}

View File

@ -129,7 +129,7 @@ func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Dura
klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)

View File

@ -72,8 +72,7 @@ func TestRunOnce(t *testing.T) {
}, nil).AnyTimes()
fakeSecretManager := secret.NewFakeManager()
fakeConfigMapManager := configmap.NewFakeManager()
podManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
fakeRuntime := &containertest.FakeRuntime{}
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
basePath, err := utiltesting.MkTmpdir("kubelet")
@ -87,6 +86,7 @@ func TestRunOnce(t *testing.T) {
cadvisor: cadvisor,
nodeLister: testNodeLister{},
statusManager: status.NewManager(nil, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, basePath),
mirrorPodClient: podtest.NewFakeMirrorClient(),
podManager: podManager,
podWorkers: &fakePodWorkers{},
os: &containertest.FakeOS{},

View File

@ -45,7 +45,6 @@ import (
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
@ -85,7 +84,7 @@ func (m *manager) testSyncBatch() {
}
func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
podManager.AddPod(getTestPod())
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
testRootDir := ""
@ -981,7 +980,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager)

View File

@ -37,7 +37,6 @@ import (
"k8s.io/kubernetes/pkg/features"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
@ -1613,8 +1612,7 @@ func createDswpWithVolumeWithCustomPluginMgr(t *testing.T, pv *v1.PersistentVolu
return true, pv, nil
})
fakePodManager := kubepod.NewBasicPodManager(
podtest.NewFakeMirrorClient())
fakePodManager := kubepod.NewBasicPodManager()
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
fakesDSW := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr, seLinuxTranslator)

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
@ -88,7 +87,7 @@ func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
@ -144,7 +143,7 @@ func TestWaitForAttachAndMountError(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -220,7 +219,7 @@ func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
claim.Status = v1.PersistentVolumeClaimStatus{
@ -265,7 +264,7 @@ func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)