From dcafd93b688557a6e5c0f977fecf0af6bfa146f0 Mon Sep 17 00:00:00 2001 From: Anish Shah Date: Fri, 23 Aug 2024 05:29:27 +0000 Subject: [PATCH] kubelet: try registering mirror pods as soon as node is registered. Mirror pods for static pods may not be created immediately during node startup because either the node is not registered or node informer is not synced. They will be created eventually when static pods are resynced (every 1-1.5 minutes). However, during this delay of 1-1.5 mins, kube-scheduler might overcommit resources to the node and eventually cause kubelet to reject pods with OutOfCPU/OutOfMemory/OutOfPods error. To ensure kube-scheduler is aware of static pod resource usage faster, mirror pods are created as soon as the node registers. --- pkg/kubelet/kubelet.go | 95 +++++++++++++++++-------- pkg/kubelet/pod/pod_manager.go | 18 +++++ pkg/kubelet/pod/pod_manager_test.go | 12 ++++ pkg/kubelet/pod/testing/mock_manager.go | 47 ++++++++++++ 4 files changed, 141 insertions(+), 31 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 01a723987f8..d057943f7ae 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1674,6 +1674,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // start syncing lease go kl.nodeLeaseController.Run(context.Background()) + + // Mirror pods for static pods may not be created immediately during node startup + // due to node registration or informer sync delays. They will be created eventually + // when static pods are resynced (every 1-1.5 minutes). + // To ensure kube-scheduler is aware of static pod resource usage faster, + // mirror pods are created as soon as the node registers. + go kl.fastStaticPodsRegistration(ctx) } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) @@ -1889,37 +1896,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType } // Create Mirror Pod for Static Pod if it doesn't already exist - if kubetypes.IsStaticPod(pod) { - deleted := false - if mirrorPod != nil { - if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) { - // The mirror pod is semantically different from the static pod. Remove - // it. The mirror pod will get recreated later. - klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID) - podFullName := kubecontainer.GetPodFullName(pod) - var err error - deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) - if deleted { - klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod)) - } else if err != nil { - klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod)) - } - } - } - if mirrorPod == nil || deleted { - node, err := kl.GetNode() - if err != nil { - klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName))) - } else if node.DeletionTimestamp != nil { - klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) - } else { - klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { - klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod)) - } - } - } - } + kl.tryReconcileMirrorPods(pod, mirrorPod) // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { @@ -3070,3 +3047,59 @@ func (kl *Kubelet) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) err func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { return kl.containerManager.UnprepareDynamicResources(ctx, pod) } + +// Ensure Mirror Pod for Static Pod exists and matches the current pod definition. +// The function logs and ignores any errors. +func (kl *Kubelet) tryReconcileMirrorPods(staticPod, mirrorPod *v1.Pod) { + if !kubetypes.IsStaticPod(staticPod) { + return + } + deleted := false + if mirrorPod != nil { + if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) { + // The mirror pod is semantically different from the static pod. Remove + // it. The mirror pod will get recreated later. + klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.ObjectMeta.UID) + podFullName := kubecontainer.GetPodFullName(staticPod) + if ok, err := kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID); err != nil { + klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod)) + } else if ok { + deleted = ok + klog.InfoS("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod)) + } + } + } + if mirrorPod == nil || deleted { + node, err := kl.GetNode() + if err != nil { + klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName))) + } else if node.DeletionTimestamp != nil { + klog.InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) + } else { + klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod)) + if err := kl.mirrorPodClient.CreateMirrorPod(staticPod); err != nil { + klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod)) + } + } + } +} + +// Ensure Mirror Pod for Static Pod exists as soon as node is registered. +func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) { + if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { + _, err := kl.GetNode() + if err == nil { + return true, nil + } + + klog.V(4).ErrorS(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName))) + return false, nil + }); err != nil { + klog.V(4).ErrorS(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName))) + } + + staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap() + for staticPod, mirrorPod := range staticPodToMirrorPodMap { + kl.tryReconcileMirrorPods(staticPod, mirrorPod) + } +} diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 77a3c40479d..e7bed9d5586 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -70,6 +70,12 @@ type Manager interface { // the pod fullnames of any orphaned mirror pods. GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) + // GetStaticPodToMirrorPodMap return a map of static pod to its corresponding + // mirror pods. It is possible that there is no mirror pod for a static pod + // if kubelet is running in standalone mode or is in the process of creating + // the mirror pod and in that case, the mirror pod is nil. + GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod + // SetPods replaces the internal pods with the new pods. // It is currently only used for testing. SetPods(pods []*v1.Pod) @@ -226,6 +232,18 @@ func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods return allPods, allMirrorPods, orphanedMirrorPodFullnames } +func (pm *basicManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod { + pm.lock.RLock() + defer pm.lock.RUnlock() + staticPodsMapToMirrorPods := make(map[*v1.Pod]*v1.Pod) + for _, pod := range podsMapToPods(pm.podByUID) { + if kubetypes.IsStaticPod(pod) { + staticPodsMapToMirrorPods[pod] = pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)] + } + } + return staticPodsMapToMirrorPods +} + func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) { pm.lock.RLock() defer pm.lock.RUnlock() diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 7834c874d61..22717dffd92 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -294,3 +294,15 @@ func TestRemovePods(t *testing.T) { }) } } + +func TestGetStaticPodToMirrorPodMap(t *testing.T) { + podManager := NewBasicPodManager() + podManager.SetPods([]*v1.Pod{mirrorPod, staticPod, normalPod}) + m := podManager.GetStaticPodToMirrorPodMap() + if len(m) != 1 { + t.Fatalf("GetStaticPodToMirrorPodMap(): got %d static pods, wanted 1 static pod", len(m)) + } + if gotMirrorPod, ok := m[staticPod]; !ok || gotMirrorPod.UID != mirrorPod.UID { + t.Fatalf("GetStaticPodToMirrorPodMap() did not return the correct mirror pod UID %s, wanted mirror pod UID %s", gotMirrorPod.UID, mirrorPod.UID) + } +} diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 779318a0024..134b9383f00 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -546,6 +546,53 @@ func (_c *MockManager_GetPodsAndMirrorPods_Call) RunAndReturn(run func() ([]*v1. return _c } +// GetStaticPodToMirrorPodMap provides a mock function with given fields: +func (_m *MockManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetStaticPodToMirrorPodMap") + } + + var r0 map[*v1.Pod]*v1.Pod + if rf, ok := ret.Get(0).(func() map[*v1.Pod]*v1.Pod); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[*v1.Pod]*v1.Pod) + } + } + + return r0 +} + +// MockManager_GetStaticPodToMirrorPodMap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStaticPodToMirrorPodMap' +type MockManager_GetStaticPodToMirrorPodMap_Call struct { + *mock.Call +} + +// GetStaticPodToMirrorPodMap is a helper method to define mock.On call +func (_e *MockManager_Expecter) GetStaticPodToMirrorPodMap() *MockManager_GetStaticPodToMirrorPodMap_Call { + return &MockManager_GetStaticPodToMirrorPodMap_Call{Call: _e.mock.On("GetStaticPodToMirrorPodMap")} +} + +func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Run(run func()) *MockManager_GetStaticPodToMirrorPodMap_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Return(_a0 map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) RunAndReturn(run func() map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call { + _c.Call.Return(run) + return _c +} + // GetUIDTranslations provides a mock function with given fields: func (_m *MockManager) GetUIDTranslations() (map[kubelettypes.ResolvedPodUID]kubelettypes.MirrorPodUID, map[kubelettypes.MirrorPodUID]kubelettypes.ResolvedPodUID) { ret := _m.Called()