diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 32730eb0bb2..73119d96b35 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1693,6 +1693,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // start syncing lease go kl.nodeLeaseController.Run(context.Background()) + + // Mirror pods for static pods may not be created immediately during node startup + // due to node registration or informer sync delays. They will be created eventually + // when static pods are resynced (every 1-1.5 minutes). + // To ensure kube-scheduler is aware of static pod resource usage faster, + // mirror pods are created as soon as the node registers. + go kl.fastStaticPodsRegistration(ctx) } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) @@ -1926,37 +1933,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType } // Create Mirror Pod for Static Pod if it doesn't already exist - if kubetypes.IsStaticPod(pod) { - deleted := false - if mirrorPod != nil { - if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) { - // The mirror pod is semantically different from the static pod. Remove - // it. The mirror pod will get recreated later. - klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID) - podFullName := kubecontainer.GetPodFullName(pod) - var err error - deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) - if deleted { - klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod)) - } else if err != nil { - klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod)) - } - } - } - if mirrorPod == nil || deleted { - node, err := kl.GetNode() - if err != nil { - klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName))) - } else if node.DeletionTimestamp != nil { - klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) - } else { - klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) - if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil { - klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod)) - } - } - } - } + kl.tryReconcileMirrorPods(pod, mirrorPod) // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { @@ -3054,3 +3031,59 @@ func (kl *Kubelet) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) err func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { return kl.containerManager.UnprepareDynamicResources(ctx, pod) } + +// Ensure Mirror Pod for Static Pod exists and matches the current pod definition. +// The function logs and ignores any errors. +func (kl *Kubelet) tryReconcileMirrorPods(staticPod, mirrorPod *v1.Pod) { + if !kubetypes.IsStaticPod(staticPod) { + return + } + deleted := false + if mirrorPod != nil { + if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) { + // The mirror pod is semantically different from the static pod. Remove + // it. The mirror pod will get recreated later. + klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.ObjectMeta.UID) + podFullName := kubecontainer.GetPodFullName(staticPod) + if ok, err := kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID); err != nil { + klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod)) + } else if ok { + deleted = ok + klog.InfoS("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod)) + } + } + } + if mirrorPod == nil || deleted { + node, err := kl.GetNode() + if err != nil { + klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName))) + } else if node.DeletionTimestamp != nil { + klog.InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) + } else { + klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod)) + if err := kl.mirrorPodClient.CreateMirrorPod(staticPod); err != nil { + klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod)) + } + } + } +} + +// Ensure Mirror Pod for Static Pod exists as soon as node is registered. +func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) { + if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { + _, err := kl.GetNode() + if err == nil { + return true, nil + } + + klog.V(4).ErrorS(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName))) + return false, nil + }); err != nil { + klog.V(4).ErrorS(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName))) + } + + staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap() + for staticPod, mirrorPod := range staticPodToMirrorPodMap { + kl.tryReconcileMirrorPods(staticPod, mirrorPod) + } +} diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 77a3c40479d..e7bed9d5586 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -70,6 +70,12 @@ type Manager interface { // the pod fullnames of any orphaned mirror pods. GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) + // GetStaticPodToMirrorPodMap return a map of static pod to its corresponding + // mirror pods. It is possible that there is no mirror pod for a static pod + // if kubelet is running in standalone mode or is in the process of creating + // the mirror pod and in that case, the mirror pod is nil. + GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod + // SetPods replaces the internal pods with the new pods. // It is currently only used for testing. SetPods(pods []*v1.Pod) @@ -226,6 +232,18 @@ func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods return allPods, allMirrorPods, orphanedMirrorPodFullnames } +func (pm *basicManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod { + pm.lock.RLock() + defer pm.lock.RUnlock() + staticPodsMapToMirrorPods := make(map[*v1.Pod]*v1.Pod) + for _, pod := range podsMapToPods(pm.podByUID) { + if kubetypes.IsStaticPod(pod) { + staticPodsMapToMirrorPods[pod] = pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)] + } + } + return staticPodsMapToMirrorPods +} + func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) { pm.lock.RLock() defer pm.lock.RUnlock() diff --git a/pkg/kubelet/pod/pod_manager_test.go b/pkg/kubelet/pod/pod_manager_test.go index 7834c874d61..22717dffd92 100644 --- a/pkg/kubelet/pod/pod_manager_test.go +++ b/pkg/kubelet/pod/pod_manager_test.go @@ -294,3 +294,15 @@ func TestRemovePods(t *testing.T) { }) } } + +func TestGetStaticPodToMirrorPodMap(t *testing.T) { + podManager := NewBasicPodManager() + podManager.SetPods([]*v1.Pod{mirrorPod, staticPod, normalPod}) + m := podManager.GetStaticPodToMirrorPodMap() + if len(m) != 1 { + t.Fatalf("GetStaticPodToMirrorPodMap(): got %d static pods, wanted 1 static pod", len(m)) + } + if gotMirrorPod, ok := m[staticPod]; !ok || gotMirrorPod.UID != mirrorPod.UID { + t.Fatalf("GetStaticPodToMirrorPodMap() did not return the correct mirror pod UID %s, wanted mirror pod UID %s", gotMirrorPod.UID, mirrorPod.UID) + } +} diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index 779318a0024..134b9383f00 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -546,6 +546,53 @@ func (_c *MockManager_GetPodsAndMirrorPods_Call) RunAndReturn(run func() ([]*v1. return _c } +// GetStaticPodToMirrorPodMap provides a mock function with given fields: +func (_m *MockManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetStaticPodToMirrorPodMap") + } + + var r0 map[*v1.Pod]*v1.Pod + if rf, ok := ret.Get(0).(func() map[*v1.Pod]*v1.Pod); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[*v1.Pod]*v1.Pod) + } + } + + return r0 +} + +// MockManager_GetStaticPodToMirrorPodMap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStaticPodToMirrorPodMap' +type MockManager_GetStaticPodToMirrorPodMap_Call struct { + *mock.Call +} + +// GetStaticPodToMirrorPodMap is a helper method to define mock.On call +func (_e *MockManager_Expecter) GetStaticPodToMirrorPodMap() *MockManager_GetStaticPodToMirrorPodMap_Call { + return &MockManager_GetStaticPodToMirrorPodMap_Call{Call: _e.mock.On("GetStaticPodToMirrorPodMap")} +} + +func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Run(run func()) *MockManager_GetStaticPodToMirrorPodMap_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Return(_a0 map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) RunAndReturn(run func() map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call { + _c.Call.Return(run) + return _c +} + // GetUIDTranslations provides a mock function with given fields: func (_m *MockManager) GetUIDTranslations() (map[kubelettypes.ResolvedPodUID]kubelettypes.MirrorPodUID, map[kubelettypes.MirrorPodUID]kubelettypes.ResolvedPodUID) { ret := _m.Called()