Merge pull request #126870 from AnishShah/outofcpu-fix

Ensure mirror pods are created as soon as node is registered
This commit is contained in:
Kubernetes Prow Robot
2024-11-05 19:15:29 +00:00
committed by GitHub
4 changed files with 141 additions and 31 deletions

View File

@@ -1693,6 +1693,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// start syncing lease
go kl.nodeLeaseController.Run(context.Background())
// Mirror pods for static pods may not be created immediately during node startup
// due to node registration or informer sync delays. They will be created eventually
// when static pods are resynced (every 1-1.5 minutes).
// To ensure kube-scheduler is aware of static pod resource usage faster,
// mirror pods are created as soon as the node registers.
go kl.fastStaticPodsRegistration(ctx)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
@@ -1926,37 +1933,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubetypes.IsStaticPod(pod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
}
}
kl.tryReconcileMirrorPods(pod, mirrorPod)
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
@@ -3054,3 +3031,59 @@ func (kl *Kubelet) PrepareDynamicResources(ctx context.Context, pod *v1.Pod) err
func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) error {
return kl.containerManager.UnprepareDynamicResources(ctx, pod)
}
// Ensure Mirror Pod for Static Pod exists and matches the current pod definition.
// The function logs and ignores any errors.
func (kl *Kubelet) tryReconcileMirrorPods(staticPod, mirrorPod *v1.Pod) {
if !kubetypes.IsStaticPod(staticPod) {
return
}
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(staticPod)
if ok, err := kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID); err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
} else if ok {
deleted = ok
klog.InfoS("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod))
if err := kl.mirrorPodClient.CreateMirrorPod(staticPod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod))
}
}
}
}
// Ensure Mirror Pod for Static Pod exists as soon as node is registered.
func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) {
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
_, err := kl.GetNode()
if err == nil {
return true, nil
}
klog.V(4).ErrorS(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName)))
return false, nil
}); err != nil {
klog.V(4).ErrorS(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName)))
}
staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()
for staticPod, mirrorPod := range staticPodToMirrorPodMap {
kl.tryReconcileMirrorPods(staticPod, mirrorPod)
}
}

View File

@@ -70,6 +70,12 @@ type Manager interface {
// the pod fullnames of any orphaned mirror pods.
GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string)
// GetStaticPodToMirrorPodMap return a map of static pod to its corresponding
// mirror pods. It is possible that there is no mirror pod for a static pod
// if kubelet is running in standalone mode or is in the process of creating
// the mirror pod and in that case, the mirror pod is nil.
GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod
// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
SetPods(pods []*v1.Pod)
@@ -226,6 +232,18 @@ func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods
return allPods, allMirrorPods, orphanedMirrorPodFullnames
}
func (pm *basicManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod {
pm.lock.RLock()
defer pm.lock.RUnlock()
staticPodsMapToMirrorPods := make(map[*v1.Pod]*v1.Pod)
for _, pod := range podsMapToPods(pm.podByUID) {
if kubetypes.IsStaticPod(pod) {
staticPodsMapToMirrorPods[pod] = pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
}
}
return staticPodsMapToMirrorPods
}
func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()

View File

@@ -294,3 +294,15 @@ func TestRemovePods(t *testing.T) {
})
}
}
func TestGetStaticPodToMirrorPodMap(t *testing.T) {
podManager := NewBasicPodManager()
podManager.SetPods([]*v1.Pod{mirrorPod, staticPod, normalPod})
m := podManager.GetStaticPodToMirrorPodMap()
if len(m) != 1 {
t.Fatalf("GetStaticPodToMirrorPodMap(): got %d static pods, wanted 1 static pod", len(m))
}
if gotMirrorPod, ok := m[staticPod]; !ok || gotMirrorPod.UID != mirrorPod.UID {
t.Fatalf("GetStaticPodToMirrorPodMap() did not return the correct mirror pod UID %s, wanted mirror pod UID %s", gotMirrorPod.UID, mirrorPod.UID)
}
}

View File

@@ -546,6 +546,53 @@ func (_c *MockManager_GetPodsAndMirrorPods_Call) RunAndReturn(run func() ([]*v1.
return _c
}
// GetStaticPodToMirrorPodMap provides a mock function with given fields:
func (_m *MockManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetStaticPodToMirrorPodMap")
}
var r0 map[*v1.Pod]*v1.Pod
if rf, ok := ret.Get(0).(func() map[*v1.Pod]*v1.Pod); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[*v1.Pod]*v1.Pod)
}
}
return r0
}
// MockManager_GetStaticPodToMirrorPodMap_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStaticPodToMirrorPodMap'
type MockManager_GetStaticPodToMirrorPodMap_Call struct {
*mock.Call
}
// GetStaticPodToMirrorPodMap is a helper method to define mock.On call
func (_e *MockManager_Expecter) GetStaticPodToMirrorPodMap() *MockManager_GetStaticPodToMirrorPodMap_Call {
return &MockManager_GetStaticPodToMirrorPodMap_Call{Call: _e.mock.On("GetStaticPodToMirrorPodMap")}
}
func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Run(run func()) *MockManager_GetStaticPodToMirrorPodMap_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) Return(_a0 map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockManager_GetStaticPodToMirrorPodMap_Call) RunAndReturn(run func() map[*v1.Pod]*v1.Pod) *MockManager_GetStaticPodToMirrorPodMap_Call {
_c.Call.Return(run)
return _c
}
// GetUIDTranslations provides a mock function with given fields:
func (_m *MockManager) GetUIDTranslations() (map[kubelettypes.ResolvedPodUID]kubelettypes.MirrorPodUID, map[kubelettypes.MirrorPodUID]kubelettypes.ResolvedPodUID) {
ret := _m.Called()