diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index f38bbb8f263..ed3a44b7d4a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2442,32 +2442,6 @@ func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandle handler.HandlePodSyncs([]*v1.Pod{pod}) } -// dispatchWork starts the asynchronous sync of the pod in a pod worker. -// If the pod has completed termination, dispatchWork will perform no action. -func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { - // Run the sync in an async worker. - kl.podWorkers.UpdatePod(UpdatePodOptions{ - Pod: pod, - MirrorPod: mirrorPod, - UpdateType: syncType, - StartTime: start, - }) - // Note the number of containers for new pods. - if syncType == kubetypes.SyncPodCreate { - metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) - } -} - -// TODO: handle mirror pods in a separate component (issue #17251) -func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) { - // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the - // corresponding static pod. Send update to the pod worker if the static - // pod exists. - if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok { - kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) - } -} - // HandlePodAdditions is the callback in SyncHandler for pods being added from // a config source. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { @@ -2485,8 +2459,18 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // the apiserver and no action (other than cleanup) is required. kl.podManager.AddPod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) continue } @@ -2530,8 +2514,12 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } } - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodCreate, + StartTime: start, + }) } } @@ -2541,12 +2529,21 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.UpdatePod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) - continue + + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } } - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) + + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) } } @@ -2556,10 +2553,22 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { kl.podManager.DeletePod(pod) - if kubetypes.IsMirrorPod(pod) { - kl.handleMirrorPod(pod, start) + + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + StartTime: start, + }) continue } + // Deletion is allowed to fail because the periodic cleanup routine // will trigger deletion again. if err := kl.deletePod(pod); err != nil { @@ -2569,7 +2578,8 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { } // HandlePodReconcile is the callback in the SyncHandler interface for pods -// that should be reconciled. +// that should be reconciled. Pods are reconciled when only the status of the +// pod is updated in the API. func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { @@ -2577,13 +2587,37 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // to the pod manager. kl.podManager.UpdatePod(pod) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + // Static pods should be reconciled the same way as regular pods + } + + // TODO: reconcile being calculated in the config manager is questionable, and avoiding + // extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be + // merged (after resolving the next two TODOs). + // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation. + // TODO: this should be unnecessary today - determine what is the cause for this to + // be different than Sync, or if there is a better place for it. For instance, we have + // needsReconcile in kubelet/config, here, and in status_manager. if status.NeedToReconcilePodReadiness(pod) { - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } // After an evicted pod is synced, all dead containers in the pod can be removed. + // TODO: this is questionable - status read is async and during eviction we already + // expect to not have some container info. The pod worker knows whether a pod has + // been evicted, so if this is about minimizing the time to react to an eviction we + // can do better. If it's about preserving pod status info we can also do better. if eviction.PodIsEvicted(pod.Status) { if podStatus, err := kl.podCache.Get(pod.UID); err == nil { kl.containerDeletor.deleteContainersInPod("", podStatus, true) @@ -2597,8 +2631,24 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { start := kl.clock.Now() for _, pod := range pods { - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + // Syncing a mirror pod is a programmer error since the intent of sync is to + // batch notify all pending work. We should make it impossible to double sync, + // but for now log a programmer error to prevent accidental introduction. + klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID) + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index fe6e24a4939..e87da0a13fb 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1144,10 +1144,14 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { klog.V(3).InfoS("Pod will be restarted because it is in the desired set and not known to the pod workers (likely due to UID reuse)", "podUID", desiredPod.UID) isStatic := kubetypes.IsStaticPod(desiredPod) - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(desiredPod) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(desiredPod) + if pod == nil || wasMirror { + klog.V(2).InfoS("Programmer error, restartable pod was a mirror pod but activePods should never contain a mirror pod", "podUID", desiredPod.UID) + continue + } kl.podWorkers.UpdatePod(UpdatePodOptions{ UpdateType: kubetypes.SyncPodCreate, - Pod: desiredPod, + Pod: pod, MirrorPod: mirrorPod, }) @@ -1234,7 +1238,6 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error { // Cleanup any backoff entries. kl.backOff.GC() - return nil } @@ -1340,15 +1343,26 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con return fmt.Errorf("pod %q cannot be found - no logs available", name) } - podUID := pod.UID - if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok { + // TODO: this should be using the podWorker's pod store as authoritative, since + // the mirrorPod might still exist, the pod may have been force deleted but + // is still terminating (users should be able to view logs of force deleted static pods + // based on full name). + var podUID types.UID + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + return fmt.Errorf("mirror pod %q does not have a corresponding pod", name) + } podUID = mirrorPod.UID + } else { + podUID = pod.UID } + podStatus, found := kl.statusManager.GetPodStatus(podUID) if !found { // If there is no cached status, use the status from the - // apiserver. This is useful if kubelet has recently been - // restarted. + // config source (apiserver). This is useful if kubelet + // has recently been restarted. podStatus = pod.Status } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index c8cc4a02df5..6608a447c7e 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -597,7 +597,11 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) { }, } for _, pod := range pods { - kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now()) + kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodSync, + StartTime: time.Now(), + }) if !got { t.Errorf("Should not skip completed pod %q", pod.Name) } @@ -651,7 +655,11 @@ func TestDispatchWorkOfActivePod(t *testing.T) { } for _, pod := range pods { - kubelet.dispatchWork(pod, kubetypes.SyncPodSync, nil, time.Now()) + kubelet.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodSync, + StartTime: time.Now(), + }) if !got { t.Errorf("Should not skip active pod %q", pod.Name) } diff --git a/pkg/kubelet/pod/pod_manager.go b/pkg/kubelet/pod/pod_manager.go index 88d6864e4fc..5426ffc34ad 100644 --- a/pkg/kubelet/pod/pod_manager.go +++ b/pkg/kubelet/pod/pod_manager.go @@ -60,6 +60,10 @@ type Manager interface { // GetMirrorPodByPod returns the mirror pod for the given static pod and // whether it was known to the pod manager. GetMirrorPodByPod(*v1.Pod) (*v1.Pod, bool) + // GetPodAndMirrorPod returns the complement for a pod - if a pod was provided + // and a mirror pod can be found, return it. If a mirror pod is provided and + // the pod can be found, return it and true for wasMirror. + GetPodAndMirrorPod(*v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) // GetPodsAndMirrorPods returns the set of pods, the set of mirror pods, and // the pod fullnames of any orphaned mirror pods. GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string) @@ -324,3 +328,15 @@ func (pm *basicManager) GetPodByMirrorPod(mirrorPod *v1.Pod) (*v1.Pod, bool) { pod, ok := pm.podByFullName[kubecontainer.GetPodFullName(mirrorPod)] return pod, ok } + +func (pm *basicManager) GetPodAndMirrorPod(aPod *v1.Pod) (pod, mirrorPod *v1.Pod, wasMirror bool) { + pm.lock.RLock() + defer pm.lock.RUnlock() + + fullName := kubecontainer.GetPodFullName(aPod) + if kubetypes.IsMirrorPod(aPod) { + return pm.podByFullName[fullName], aPod, true + } + return aPod, pm.mirrorPodByFullName[fullName], false + +} diff --git a/pkg/kubelet/pod/testing/mock_manager.go b/pkg/kubelet/pod/testing/mock_manager.go index bcf210ad816..7c4afa0a958 100644 --- a/pkg/kubelet/pod/testing/mock_manager.go +++ b/pkg/kubelet/pod/testing/mock_manager.go @@ -120,6 +120,22 @@ func (mr *MockManagerMockRecorder) GetMirrorPodByPod(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMirrorPodByPod", reflect.TypeOf((*MockManager)(nil).GetMirrorPodByPod), arg0) } +// GetPodAndMirrorPod mocks base method. +func (m *MockManager) GetPodAndMirrorPod(arg0 *v1.Pod) (*v1.Pod, *v1.Pod, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPodAndMirrorPod", arg0) + ret0, _ := ret[0].(*v1.Pod) + ret1, _ := ret[1].(*v1.Pod) + ret2, _ := ret[2].(bool) + return ret0, ret1, ret2 +} + +// GetPodAndMirrorPod indicates an expected call of GetPodAndMirrorPod. +func (mr *MockManagerMockRecorder) GetPodAndMirrorPod(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPodAndMirrorPod", reflect.TypeOf((*MockManager)(nil).GetPodAndMirrorPod), arg0) +} + // GetPodByFullName mocks base method. func (m *MockManager) GetPodByFullName(arg0 string) (*v1.Pod, bool) { m.ctrl.T.Helper() diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index e5805dbcf78..82e7cb93c2c 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -1181,6 +1181,12 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update status.startedAt = p.clock.Now() status.mergeLastUpdate(update.Options) + // If we are admitting the pod and it is new, record the count of containers + // TODO: We should probably move this into syncPod and add an execution count + // to the syncPod arguments, and this should be recorded on the first sync. + // Leaving it here complicates a particularly important loop. + metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers))) + return ctx, update, true, true, true }