kubelet: try registering mirror pods as soon as node is registered.

Mirror pods for static pods may not be created immediately during node startup
because either the node is not registered or node informer is not synced.
They will be created eventually when static pods are resynced (every 1-1.5 minutes).

However, during this delay of 1-1.5 mins, kube-scheduler might overcommit resources
to the node and eventually cause kubelet to reject pods with
OutOfCPU/OutOfMemory/OutOfPods error.

To ensure kube-scheduler is aware of static pod resource usage faster,
mirror pods are created as soon as the node registers.
This commit is contained in:
Anish Shah
2024-08-23 05:29:27 +00:00
parent 79cca2786e
commit dcafd93b68
4 changed files with 141 additions and 31 deletions

View File

@@ -1674,6 +1674,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// start syncing lease // start syncing lease
go kl.nodeLeaseController.Run(context.Background()) go kl.nodeLeaseController.Run(context.Background())
// Mirror pods for static pods may not be created immediately during node startup
// due to node registration or informer sync delays. They will be created eventually
// when static pods are resynced (every 1-1.5 minutes).
// To ensure kube-scheduler is aware of static pod resource usage faster,
// mirror pods are created as soon as the node registers.
go kl.fastStaticPodsRegistration(ctx)
} }
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
@@ -1889,37 +1896,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
} }
// Create Mirror Pod for Static Pod if it doesn't already exist // Create Mirror Pod for Static Pod if it doesn't already exist
if kubetypes.IsStaticPod(pod) { kl.tryReconcileMirrorPods(pod, mirrorPod)
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
}
}
// Make data directories for the pod // Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil { if err := kl.makePodDataDirs(pod); err != nil {
@@ -3070,3 +3047,59 @@ func (kl *Kubelet) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) err
func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error { func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return kl.containerManager.UnprepareDynamicResources(ctx, pod) return kl.containerManager.UnprepareDynamicResources(ctx, pod)
} }
// Ensure Mirror Pod for Static Pod exists and matches the current pod definition.
// The function logs and ignores any errors.
func (kl *Kubelet) tryReconcileMirrorPods(staticPod, mirrorPod *v1.Pod) {
if !kubetypes.IsStaticPod(staticPod) {
return
}
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(staticPod)
if ok, err := kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID); err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
} else if ok {
deleted = ok
klog.InfoS("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod))
if err := kl.mirrorPodClient.CreateMirrorPod(staticPod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod))
}
}
}
}
// Ensure Mirror Pod for Static Pod exists as soon as node is registered.
func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) {
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
_, err := kl.GetNode()
if err == nil {
return true, nil
}
klog.V(4).ErrorS(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName)))
return false, nil
}); err != nil {
klog.V(4).ErrorS(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName)))
}
staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()
for staticPod, mirrorPod := range staticPodToMirrorPodMap {
kl.tryReconcileMirrorPods(staticPod, mirrorPod)
}
}

View File

@@ -70,6 +70,12 @@ type Manager interface {
// the pod fullnames of any orphaned mirror pods. // the pod fullnames of any orphaned mirror pods.
GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string)
// GetStaticPodToMirrorPodMap return a map of static pod to its corresponding
// mirror pods. It is possible that there is no mirror pod for a static pod
// if kubelet is running in standalone mode or is in the process of creating
// the mirror pod and in that case, the mirror pod is nil.
GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod
// SetPods replaces the internal pods with the new pods. // SetPods replaces the internal pods with the new pods.
// It is currently only used for testing. // It is currently only used for testing.
SetPods(pods []*v1.Pod) SetPods(pods []*v1.Pod)
@@ -226,6 +232,18 @@ func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods
return allPods, allMirrorPods, orphanedMirrorPodFullnames return allPods, allMirrorPods, orphanedMirrorPodFullnames
} }
func (pm *basicManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod {
pm.lock.RLock()
defer pm.lock.RUnlock()
staticPodsMapToMirrorPods := make(map[*v1.Pod]*v1.Pod)
for _, pod := range podsMapToPods(pm.podByUID) {
if kubetypes.IsStaticPod(pod) {
staticPodsMapToMirrorPods[pod] = pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
}
}
return staticPodsMapToMirrorPods
}
func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) { func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
pm.lock.RLock() pm.lock.RLock()
defer pm.lock.RUnlock() defer pm.lock.RUnlock()

View File

@@ -294,3 +294,15 @@ func TestRemovePods(t *testing.T) {
}) })
} }
} }
func TestGetStaticPodToMirrorPodMap(t *testing.T) {
podManager := NewBasicPodManager()
podManager.SetPods([]*v1.Pod{mirrorPod, staticPod, normalPod})
m := podManager.GetStaticPodToMirrorPodMap()
if len(m) != 1 {
t.Fatalf("GetStaticPodToMirrorPodMap(): got %d static pods, wanted 1 static pod", len(m))
}
if gotMirrorPod, ok := m[staticPod]; !ok || gotMirrorPod.UID != mirrorPod.UID {
t.Fatalf("GetStaticPodToMirrorPodMap() did not return the correct mirror pod UID %s, wanted mirror pod UID %s", gotMirrorPod.UID, mirrorPod.UID)
}
}

View File

@@ -546,6 +546,53 @@ func (_c *MockManager_GetPodsAndMirrorPods_Call) RunAndReturn(run func() ([]*v1.
return _c return _c
} }
// GetStaticPodToMirrorPodMap provides a mock function with given fields:
func (_m *MockManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetStaticPodToMirrorPodMap")
}
var r0 map[*v1.Pod]*v1.Pod
if rf, ok := ret.Get(0).(func() map[*v1.Pod]*v1.Pod); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[*v1.Pod]*v1.Pod)
}
}
return r0
}
// MockManager_GetStaticPodToMirrorPodMap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStaticPodToMirrorPodMap'
type MockManager_GetStaticPodToMirrorPodMap_Call struct {
*mock.Call
}
// GetStaticPodToMirrorPodMap is a helper method to define mock.On call
func (_e *MockManager_Expecter) GetStaticPodToMirrorPodMap() *MockManager_GetStaticPodToMirrorPodMap_Call {
return &MockManager_GetStaticPodToMirrorPodMap_Call{Call: _e.mock.On("GetStaticPodToMirrorPodMap")}
}
func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Run(run func()) *MockManager_GetStaticPodToMirrorPodMap_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Return(_a0 map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) RunAndReturn(run func() map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call {
_c.Call.Return(run)
return _c
}
// GetUIDTranslations provides a mock function with given fields: // GetUIDTranslations provides a mock function with given fields:
func (_m *MockManager) GetUIDTranslations() (map[kubelettypes.ResolvedPodUID]kubelettypes.MirrorPodUID, map[kubelettypes.MirrorPodUID]kubelettypes.ResolvedPodUID) { func (_m *MockManager) GetUIDTranslations() (map[kubelettypes.ResolvedPodUID]kubelettypes.MirrorPodUID, map[kubelettypes.MirrorPodUID]kubelettypes.ResolvedPodUID) {
ret := _m.Called() ret := _m.Called()