kubelet: Reduce the interface pod.Manager consumers accept

Every component that uses a pod.Manager should use a stub interface
(like we do for podWorker) that explicitly describes what methods
they use. This will allow podWorker to implement the minimum set
of manager interfaces.
This commit is contained in:
Clayton Coleman 2023-01-23 21:28:37 -05:00
parent 8bd94dfa76
commit 166256f73e
No known key found for this signature in database
GPG Key ID: CF7DB7FC943D3E0E
7 changed files with 166 additions and 45 deletions

View File

@ -27,18 +27,24 @@ import (
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
}
// NewCRIStatsProvider returns a Provider that provides the node stats
// from cAdvisor and the container stats from CRI.
func NewCRIStatsProvider(
cadvisor cadvisor.Interface,
resourceAnalyzer stats.ResourceAnalyzer,
podManager kubepod.Manager,
podManager PodManager,
runtimeCache kubecontainer.RuntimeCache,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
@ -54,7 +60,7 @@ func NewCRIStatsProvider(
func NewCadvisorStatsProvider(
cadvisor cadvisor.Interface,
resourceAnalyzer stats.ResourceAnalyzer,
podManager kubepod.Manager,
podManager PodManager,
runtimeCache kubecontainer.RuntimeCache,
imageService kubecontainer.ImageService,
statusProvider status.PodStatusProvider,
@ -67,7 +73,7 @@ func NewCadvisorStatsProvider(
// cAdvisor and the container stats using the containerStatsProvider.
func newStatsProvider(
cadvisor cadvisor.Interface,
podManager kubepod.Manager,
podManager PodManager,
runtimeCache kubecontainer.RuntimeCache,
containerStatsProvider containerStatsProvider,
) *Provider {
@ -82,7 +88,7 @@ func newStatsProvider(
// Provider provides the stats of the node and the pod-managed containers.
type Provider struct {
cadvisor cadvisor.Interface
podManager kubepod.Manager
podManager PodManager
runtimeCache kubecontainer.RuntimeCache
containerStatsProvider
}

View File

@ -40,7 +40,6 @@ import (
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status/state"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
statusutil "k8s.io/kubernetes/pkg/util/pod"
@ -70,7 +69,7 @@ type versionedPodStatus struct {
// All methods are thread-safe.
type manager struct {
kubeClient clientset.Interface
podManager kubepod.Manager
podManager PodManager
// Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex
@ -87,8 +86,18 @@ type manager struct {
stateFileDirectory string
}
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
// that need to introspect status.
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
GetPodByUID(types.UID) (*v1.Pod, bool)
GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool)
TranslatePodUID(uid types.UID) kubetypes.ResolvedPodUID
GetUIDTranslations() (podToMirror map[kubetypes.ResolvedPodUID]kubetypes.MirrorPodUID, mirrorToPod map[kubetypes.MirrorPodUID]kubetypes.ResolvedPodUID)
}
// PodStatusProvider knows how to provide status for a pod. It is intended to be used by other components
// that need to introspect the authoritative status of a pod. The PodStatusProvider represents the actual
// status of a running pod as the kubelet sees it.
type PodStatusProvider interface {
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
// was a cache hit.
@ -149,7 +158,7 @@ type Manager interface {
const syncPeriod = 10 * time.Second
// NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager {
func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager {
return &manager{
kubeClient: kubeClient,
podManager: podManager,

View File

@ -50,6 +50,12 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util"
)
type mutablePodManager interface {
AddPod(*v1.Pod)
UpdatePod(*v1.Pod)
DeletePod(*v1.Pod)
}
// Generate new instance of test pod with the same initial value.
func getTestPod() *v1.Pod {
return &v1.Pod{
@ -85,7 +91,7 @@ func (m *manager) testSyncBatch() {
func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager()
podManager.AddPod(getTestPod())
podManager.(mutablePodManager).AddPod(getTestPod())
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
@ -328,10 +334,10 @@ func TestSyncPodChecksMismatchedUID(t *testing.T) {
syncer := newTestManager(&fake.Clientset{})
pod := getTestPod()
pod.UID = "first"
syncer.podManager.AddPod(pod)
syncer.podManager.(mutablePodManager).AddPod(pod)
differentPod := getTestPod()
differentPod.UID = "second"
syncer.podManager.AddPod(differentPod)
syncer.podManager.(mutablePodManager).AddPod(differentPod)
syncer.kubeClient = fake.NewSimpleClientset(pod)
syncer.SetPodStatus(differentPod, getRandomPodStatus())
verifyActions(t, syncer, []core.Action{getAction()})
@ -531,7 +537,7 @@ func TestStaticPod(t *testing.T) {
m := newTestManager(client)
t.Logf("Create the static pod")
m.podManager.AddPod(staticPod)
m.podManager.(mutablePodManager).AddPod(staticPod)
assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod")
status := getRandomPodStatus()
@ -549,7 +555,7 @@ func TestStaticPod(t *testing.T) {
assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions())
t.Logf("Create the mirror pod")
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")
assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), kubetypes.ResolvedPodUID(staticPod.UID))
@ -566,10 +572,10 @@ func TestStaticPod(t *testing.T) {
verifyActions(t, m, []core.Action{})
t.Logf("Change mirror pod identity.")
m.podManager.DeletePod(mirrorPod)
m.podManager.(mutablePodManager).DeletePod(mirrorPod)
mirrorPod.UID = "new-mirror-pod"
mirrorPod.Status = v1.PodStatus{}
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
t.Logf("Should not update to mirror pod, because UID has changed.")
assert.Equal(t, m.syncBatch(true), 1)
@ -1067,7 +1073,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) {
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager(podtest.NewFakeMirrorClient())
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager)
@ -1131,7 +1137,7 @@ func TestSetContainerReadiness(t *testing.T) {
m := newTestManager(&fake.Clientset{})
// Add test pod because the container spec has been changed.
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
t.Log("Setting readiness before status should fail.")
m.SetContainerReadiness(pod.UID, cID1, true)
@ -1215,7 +1221,7 @@ func TestSetContainerStartup(t *testing.T) {
m := newTestManager(&fake.Clientset{})
// Add test pod because the container spec has been changed.
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
t.Log("Setting startup before status should fail.")
m.SetContainerStartup(pod.UID, cID1, true)
@ -1279,11 +1285,11 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
t.Logf("Non-orphaned pods should not be removed.")
m.SetPodStatus(testPod, getRandomPodStatus())
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
staticPod := mirrorPod
staticPod.UID = "static-uid"
staticPod.Annotations = map[string]string{kubetypes.ConfigSourceAnnotationKey: "file"}
m.podManager.AddPod(staticPod)
m.podManager.(mutablePodManager).AddPod(staticPod)
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.testSyncBatch()
@ -1311,7 +1317,7 @@ func TestReconcilePodStatus(t *testing.T) {
testPod.Status = podStatus
t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing")
syncer.podManager.UpdatePod(testPod)
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) {
t.Fatalf("Pod status is the same, a reconciliation is not needed")
}
@ -1326,7 +1332,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Logf("Syncbatch should do nothing, as a reconciliation is not required")
normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy()
testPod.Status.StartTime = &normalizedStartTime
syncer.podManager.UpdatePod(testPod)
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if syncer.needsReconcile(testPod.UID, podStatus) {
t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed")
}
@ -1336,7 +1342,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
changedPodStatus := getRandomPodStatus()
syncer.podManager.UpdatePod(testPod)
syncer.podManager.(mutablePodManager).UpdatePod(testPod)
if !syncer.needsReconcile(testPod.UID, changedPodStatus) {
t.Fatalf("Pod status is different, a reconciliation is needed")
}
@ -1359,7 +1365,7 @@ func TestDeletePodBeforeFinished(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
status := getRandomPodStatus()
status.Phase = v1.PodFailed
m.SetPodStatus(pod, status)
@ -1373,7 +1379,7 @@ func TestDeletePodFinished(t *testing.T) {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(pod)
m := newTestManager(client)
m.podManager.AddPod(pod)
m.podManager.(mutablePodManager).AddPod(pod)
status := getRandomPodStatus()
status.Phase = v1.PodFailed
m.TerminatePod(pod)
@ -1394,8 +1400,8 @@ func TestDoNotDeleteMirrorPods(t *testing.T) {
mirrorPod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
client := fake.NewSimpleClientset(mirrorPod)
m := newTestManager(client)
m.podManager.AddPod(staticPod)
m.podManager.AddPod(mirrorPod)
m.podManager.(mutablePodManager).AddPod(staticPod)
m.podManager.(mutablePodManager).AddPod(mirrorPod)
t.Logf("Verify setup.")
assert.True(t, kubetypes.IsStaticPod(staticPod), "SetUp error: staticPod")
assert.True(t, kubetypes.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod")

View File

@ -27,8 +27,91 @@ import (
v1 "k8s.io/api/core/v1"
types "k8s.io/apimachinery/pkg/types"
container "k8s.io/kubernetes/pkg/kubelet/container"
types0 "k8s.io/kubernetes/pkg/kubelet/types"
)
// MockPodManager is a mock of PodManager interface.
type MockPodManager struct {
ctrl *gomock.Controller
recorder *MockPodManagerMockRecorder
}
// MockPodManagerMockRecorder is the mock recorder for MockPodManager.
type MockPodManagerMockRecorder struct {
mock *MockPodManager
}
// NewMockPodManager creates a new mock instance.
func NewMockPodManager(ctrl *gomock.Controller) *MockPodManager {
mock := &MockPodManager{ctrl: ctrl}
mock.recorder = &MockPodManagerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPodManager) EXPECT() *MockPodManagerMockRecorder {
return m.recorder
}
// GetMirrorPodByPod mocks base method.
func (m *MockPodManager) GetMirrorPodByPod(arg0 *v1.Pod) (*v1.Pod, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetMirrorPodByPod", arg0)
ret0, _ := ret[0].(*v1.Pod)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetMirrorPodByPod indicates an expected call of GetMirrorPodByPod.
func (mr *MockPodManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockPodManager)(nil).GetMirrorPodByPod), arg0)
}
// GetPodByUID mocks base method.
func (m *MockPodManager) GetPodByUID(arg0 types.UID) (*v1.Pod, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPodByUID", arg0)
ret0, _ := ret[0].(*v1.Pod)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// GetPodByUID indicates an expected call of GetPodByUID.
func (mr *MockPodManagerMockRecorder) GetPodByUID(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodByUID", reflect.TypeOf((*MockPodManager)(nil).GetPodByUID), arg0)
}
// GetUIDTranslations mocks base method.
func (m *MockPodManager) GetUIDTranslations() (map[types0.ResolvedPodUID]types0.MirrorPodUID, map[types0.MirrorPodUID]types0.ResolvedPodUID) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetUIDTranslations")
ret0, _ := ret[0].(map[types0.ResolvedPodUID]types0.MirrorPodUID)
ret1, _ := ret[1].(map[types0.MirrorPodUID]types0.ResolvedPodUID)
return ret0, ret1
}
// GetUIDTranslations indicates an expected call of GetUIDTranslations.
func (mr *MockPodManagerMockRecorder) GetUIDTranslations() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUIDTranslations", reflect.TypeOf((*MockPodManager)(nil).GetUIDTranslations))
}
// TranslatePodUID mocks base method.
func (m *MockPodManager) TranslatePodUID(uid types.UID) types0.ResolvedPodUID {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TranslatePodUID", uid)
ret0, _ := ret[0].(types0.ResolvedPodUID)
return ret0
}
// TranslatePodUID indicates an expected call of TranslatePodUID.
func (mr *MockPodManagerMockRecorder) TranslatePodUID(uid interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TranslatePodUID", reflect.TypeOf((*MockPodManager)(nil).TranslatePodUID), uid)
}
// MockPodStatusProvider is a mock of PodStatusProvider interface.
type MockPodStatusProvider struct {
ctrl *gomock.Controller

View File

@ -40,7 +40,6 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csimigration"
@ -70,12 +69,19 @@ type DesiredStateOfWorldPopulator interface {
HasAddedPods() bool
}
// podStateProvider can determine if a pod is going to be terminated.
type podStateProvider interface {
// PodStateProvider can determine if a pod is going to be terminated.
type PodStateProvider interface {
ShouldPodContainersBeTerminating(types.UID) bool
ShouldPodRuntimeBeRemoved(types.UID) bool
}
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
GetPodByUID(types.UID) (*v1.Pod, bool)
GetPods() []*v1.Pod
}
// NewDesiredStateOfWorldPopulator returns a new instance of
// DesiredStateOfWorldPopulator.
//
@ -90,8 +96,8 @@ type podStateProvider interface {
func NewDesiredStateOfWorldPopulator(
kubeClient clientset.Interface,
loopSleepDuration time.Duration,
podManager pod.Manager,
podStateProvider podStateProvider,
podManager PodManager,
podStateProvider PodStateProvider,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
kubeContainerRuntime kubecontainer.Runtime,
@ -121,8 +127,8 @@ func NewDesiredStateOfWorldPopulator(
type desiredStateOfWorldPopulator struct {
kubeClient clientset.Interface
loopSleepDuration time.Duration
podManager pod.Manager
podStateProvider podStateProvider
podManager PodManager
podStateProvider PodStateProvider
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
pods processedPods

View File

@ -323,17 +323,22 @@ func TestFindAndAddNewPods_WithVolumeRetrievalError(t *testing.T) {
}
}
type mutablePodManager interface {
GetPodByName(string, string) (*v1.Pod, bool)
DeletePod(*v1.Pod)
}
func TestFindAndAddNewPods_FindAndRemoveDeletedPods(t *testing.T) {
dswp, fakePodState, pod, expectedVolumeName, _ := prepareDSWPWithPodPV(t)
podName := util.GetUniquePodName(pod)
//let the pod be terminated
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name)
if !exist {
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
}
podGet.Status.Phase = v1.PodFailed
dswp.podManager.DeletePod(pod)
dswp.podManager.(mutablePodManager).DeletePod(pod)
dswp.findAndRemoveDeletedPods()
@ -389,7 +394,7 @@ func TestFindAndRemoveDeletedPodsWithActualState(t *testing.T) {
podName := util.GetUniquePodName(pod)
//let the pod be terminated
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name)
if !exist {
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
}
@ -451,12 +456,12 @@ func TestFindAndRemoveDeletedPodsWithUncertain(t *testing.T) {
podName := util.GetUniquePodName(pod)
//let the pod be terminated
podGet, exist := dswp.podManager.GetPodByName(pod.Namespace, pod.Name)
podGet, exist := dswp.podManager.(mutablePodManager).GetPodByName(pod.Namespace, pod.Name)
if !exist {
t.Fatalf("Failed to get pod by pod name: %s and namespace: %s", pod.Name, pod.Namespace)
}
podGet.Status.Phase = v1.PodFailed
dswp.podManager.DeletePod(pod)
dswp.podManager.(mutablePodManager).DeletePod(pod)
fakePodState.removed = map[kubetypes.UID]struct{}{pod.UID: {}}
// Add the volume to ASW by reconciling.

View File

@ -39,7 +39,6 @@ import (
csitrans "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
@ -151,11 +150,18 @@ type VolumeManager interface {
}
// podStateProvider can determine if a pod is going to be terminated
type podStateProvider interface {
type PodStateProvider interface {
ShouldPodContainersBeTerminating(k8stypes.UID) bool
ShouldPodRuntimeBeRemoved(k8stypes.UID) bool
}
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
// See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
type PodManager interface {
GetPodByUID(k8stypes.UID) (*v1.Pod, bool)
GetPods() []*v1.Pod
}
// NewVolumeManager returns a new concrete instance implementing the
// VolumeManager interface.
//
@ -167,8 +173,8 @@ type podStateProvider interface {
func NewVolumeManager(
controllerAttachDetachEnabled bool,
nodeName k8stypes.NodeName,
podManager pod.Manager,
podStateProvider podStateProvider,
podManager PodManager,
podStateProvider PodStateProvider,
kubeClient clientset.Interface,
volumePluginMgr *volume.VolumePluginMgr,
kubeContainerRuntime container.Runtime,