diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index fa975c9490d..44e4b0a805b 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -63,11 +63,6 @@ type versionedPodStatus struct { status v1.PodStatus } -type podStatusSyncRequest struct { - podUID types.UID - status versionedPodStatus -} - // Updates pod statuses in apiserver. Writes only when new status has changed. // All methods are thread-safe. type manager struct { @@ -76,7 +71,7 @@ type manager struct { // Map from pod UID to sync status of the corresponding pod. podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex - podStatusChannel chan podStatusSyncRequest + podStatusChannel chan struct{} // Map from (mirror) pod UID to latest status version successfully sent to the API server. // apiStatusVersions must only be accessed from the sync thread. apiStatusVersions map[kubetypes.MirrorPodUID]uint64 @@ -158,7 +153,7 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD kubeClient: kubeClient, podManager: podManager, podStatuses: make(map[types.UID]versionedPodStatus), - podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses + podStatusChannel: make(chan struct{}, 1), apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), podDeletionSafety: podDeletionSafety, podStartupLatencyHelper: podStartupLatencyHelper, @@ -217,19 +212,12 @@ func (m *manager) Start() { go wait.Forever(func() { for { select { - case syncRequest := <-m.podStatusChannel: - klog.V(5).InfoS("Status Manager: syncing pod with status from podStatusChannel", - "podUID", syncRequest.podUID, - "statusVersion", syncRequest.status.version, - "status", syncRequest.status.status) - m.syncPod(syncRequest.podUID, syncRequest.status) + case <-m.podStatusChannel: + klog.V(4).InfoS("Syncing updated statuses") + m.syncBatch(false) case <-syncTicker: - klog.V(5).InfoS("Status Manager: syncing batch") - // remove any entries in the status channel since the batch will handle them - for i := len(m.podStatusChannel); i > 0; i-- { - <-m.podStatusChannel - } - m.syncBatch() + klog.V(4).InfoS("Syncing all statuses") + m.syncBatch(true) } } }, 0) @@ -540,9 +528,9 @@ func checkContainerStateTransition(oldStatuses, newStatuses []v1.ContainerStatus } // updateStatusInternal updates the internal status cache, and queues an update to the api server if -// necessary. Returns whether an update was triggered. +// necessary. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool { +func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) { var oldStatus v1.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { @@ -556,11 +544,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp // Check for illegal state transition in containers if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil { klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) - return false + return } if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil { klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) - return false + return } // Set ContainersReadyCondition.LastTransitionTime. @@ -630,7 +618,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp // clobbering each other so the phase of a pod progresses monotonically. if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status) - return false // No new status. + return } newStatus := versionedPodStatus{ @@ -652,20 +640,9 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp m.podStatuses[pod.UID] = newStatus select { - case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: - klog.V(5).InfoS("Status Manager: adding pod with new status to podStatusChannel", - "pod", klog.KObj(pod), - "podUID", pod.UID, - "statusVersion", newStatus.version, - "status", newStatus.status) - return true + case m.podStatusChannel <- struct{}{}: default: - // Let the periodic syncBatch handle the update if the channel is full. - // We can't block, since we hold the mutex lock. - klog.V(4).InfoS("Skipping the status update for pod for now because the channel is full", - "pod", klog.KObj(pod), - "status", status) - return false + // there's already a status update pending } } @@ -710,25 +687,38 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) { } } -// syncBatch syncs pods statuses with the apiserver. -func (m *manager) syncBatch() { - var updatedStatuses []podStatusSyncRequest +// syncBatch syncs pods statuses with the apiserver. Returns the number of syncs +// attempted for testing. +func (m *manager) syncBatch(all bool) int { + type podSync struct { + podUID types.UID + statusUID kubetypes.MirrorPodUID + status versionedPodStatus + } + + var updatedStatuses []podSync podToMirror, mirrorToPod := m.podManager.GetUIDTranslations() func() { // Critical section m.podStatusesLock.RLock() defer m.podStatusesLock.RUnlock() // Clean up orphaned versions. - for uid := range m.apiStatusVersions { - _, hasPod := m.podStatuses[types.UID(uid)] - _, hasMirror := mirrorToPod[uid] - if !hasPod && !hasMirror { - delete(m.apiStatusVersions, uid) + if all { + for uid := range m.apiStatusVersions { + _, hasPod := m.podStatuses[types.UID(uid)] + _, hasMirror := mirrorToPod[uid] + if !hasPod && !hasMirror { + delete(m.apiStatusVersions, uid) + } } } + // Decide which pods need status updates. for uid, status := range m.podStatuses { - syncedUID := kubetypes.MirrorPodUID(uid) + // translate the pod UID (source) to the status UID (API pod) - + // static pods are identified in source by pod UID but tracked in the + // API via the uid of the mirror pod + uidOfStatus := kubetypes.MirrorPodUID(uid) if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok { if mirrorUID == "" { klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping", @@ -736,34 +726,45 @@ func (m *manager) syncBatch() { "pod", klog.KRef(status.podNamespace, status.podName)) continue } - syncedUID = mirrorUID + uidOfStatus = mirrorUID } - if m.needsUpdate(types.UID(syncedUID), status) { - updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) + + // if a new status update has been delivered, trigger an update, otherwise the + // pod can wait for the next bulk check (which performs reconciliation as well) + if !all { + if m.apiStatusVersions[uidOfStatus] >= status.version { + continue + } + updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status}) + continue + } + + // Ensure that any new status, or mismatched status, or pod that is ready for + // deletion gets updated. If a status update fails we retry the next time any + // other pod is updated. + if m.needsUpdate(types.UID(uidOfStatus), status) { + updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status}) } else if m.needsReconcile(uid, status.status) { // Delete the apiStatusVersions here to force an update on the pod status // In most cases the deleted apiStatusVersions here should be filled // soon after the following syncPod() [If the syncPod() sync an update // successfully]. - delete(m.apiStatusVersions, syncedUID) - updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) + delete(m.apiStatusVersions, uidOfStatus) + updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status}) } } }() for _, update := range updatedStatuses { - klog.V(5).InfoS("Status Manager: syncPod in syncbatch", "podUID", update.podUID) + klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version) m.syncPod(update.podUID, update.status) } + + return len(updatedStatuses) } -// syncPod syncs the given status with the API server. The caller must not hold the lock. +// syncPod syncs the given status with the API server. The caller must not hold the status lock. func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { - if !m.needsUpdate(uid, status) { - klog.V(1).InfoS("Status for pod is up-to-date; skipping", "podUID", uid) - return - } - // TODO: make me easier to express from client code pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{}) if errors.IsNotFound(err) { @@ -815,7 +816,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { if status.at.IsZero() { klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) } else { - duration := time.Now().Sub(status.at).Truncate(time.Millisecond) + duration := time.Since(status.at).Truncate(time.Millisecond) metrics.PodStatusSyncDuration.Observe(duration.Seconds()) } diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 82b8e9376e3..9b953da772d 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -69,7 +69,7 @@ func getTestPod() *v1.Pod { // After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation // will be triggered, which will mess up all the old unit test. // To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in -// pod manager the same with cached ones before syncBatch() so as to avoid reconciling. +// pod manager the same with cached ones before syncBatch(true) so as to avoid reconciling. func (m *manager) testSyncBatch() { for uid, status := range m.podStatuses { pod, ok := m.podManager.GetPodByUID(uid) @@ -81,7 +81,7 @@ func (m *manager) testSyncBatch() { pod.Status = status.status } } - m.syncBatch() + m.syncBatch(true) } func newTestManager(kubeClient clientset.Interface) *manager { @@ -113,19 +113,19 @@ func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action actions := manager.kubeClient.(*fake.Clientset).Actions() defer manager.kubeClient.(*fake.Clientset).ClearActions() if len(actions) != len(expectedActions) { - t.Fatalf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) - return + t.Fatalf("unexpected actions: %s", cmp.Diff(expectedActions, actions)) } for i := 0; i < len(actions); i++ { e := expectedActions[i] a := actions[i] if !a.Matches(e.GetVerb(), e.GetResource().Resource) || a.GetSubresource() != e.GetSubresource() { - t.Errorf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) + t.Errorf("unexpected actions: %s", cmp.Diff(expectedActions, actions)) } } } func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { + t.Helper() // Consume all updates in the channel. numUpdates := manager.consumeUpdates() if numUpdates != expectedUpdates { @@ -137,9 +137,8 @@ func (m *manager) consumeUpdates() int { updates := 0 for { select { - case syncRequest := <-m.podStatusChannel: - m.syncPod(syncRequest.podUID, syncRequest.status) - updates++ + case <-m.podStatusChannel: + updates += m.syncBatch(false) default: return updates } @@ -214,8 +213,9 @@ func TestChangedStatus(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() syncer.SetPodStatus(testPod, getRandomPodStatus()) + verifyUpdates(t, syncer, 1) syncer.SetPodStatus(testPod, getRandomPodStatus()) - verifyUpdates(t, syncer, 2) + verifyUpdates(t, syncer, 1) } func TestChangedStatusKeepsStartTime(t *testing.T) { @@ -225,8 +225,9 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { firstStatus := getRandomPodStatus() firstStatus.StartTime = &now syncer.SetPodStatus(testPod, firstStatus) + verifyUpdates(t, syncer, 1) syncer.SetPodStatus(testPod, getRandomPodStatus()) - verifyUpdates(t, syncer, 2) + verifyUpdates(t, syncer, 1) finalStatus := expectPodStatus(t, syncer, testPod) if finalStatus.StartTime.IsZero() { t.Errorf("StartTime should not be zero") @@ -407,9 +408,9 @@ func TestStaleUpdates(t *testing.T) { status.Message = "second version bump" m.SetPodStatus(pod, status) - t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") - m.syncBatch() - verifyUpdates(t, m, 3) + t.Logf("sync batch before syncPods pushes latest status, resulting in one update during the batch") + m.syncBatch(true) + verifyUpdates(t, m, 0) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("Nothing left in the channel to sync") verifyActions(t, m, []core.Action{}) @@ -423,7 +424,7 @@ func TestStaleUpdates(t *testing.T) { m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.SetPodStatus(pod, status) - m.syncBatch() + m.syncBatch(true) verifyActions(t, m, []core.Action{getAction()}) t.Logf("Nothing stuck in the pipe.") @@ -545,7 +546,7 @@ func TestStaticPod(t *testing.T) { assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") - m.syncBatch() + m.syncBatch(true) assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions()) t.Logf("Create the mirror pod") @@ -558,6 +559,7 @@ func TestStaticPod(t *testing.T) { assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) t.Logf("Should sync pod because the corresponding mirror pod is created") + assert.Equal(t, m.syncBatch(true), 1) verifyActions(t, m, []core.Action{getAction(), patchAction()}) t.Logf("syncBatch should not sync any pods because nothing is changed.") @@ -571,7 +573,7 @@ func TestStaticPod(t *testing.T) { m.podManager.AddPod(mirrorPod) t.Logf("Should not update to mirror pod, because UID has changed.") - m.syncBatch() + assert.Equal(t, m.syncBatch(true), 1) verifyActions(t, m, []core.Action{getAction()}) } @@ -1184,7 +1186,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) { t.Logf("Orphaned pods should be removed.") m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 - m.syncBatch() + m.syncBatch(true) if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok { t.Errorf("Should have cleared status for testPod") } @@ -1216,7 +1218,7 @@ func TestReconcilePodStatus(t *testing.T) { syncer := newTestManager(client) syncer.SetPodStatus(testPod, getRandomPodStatus()) t.Logf("Call syncBatch directly to test reconcile") - syncer.syncBatch() // The apiStatusVersions should be set now + syncer.syncBatch(true) // The apiStatusVersions should be set now client.ClearActions() podStatus, ok := syncer.GetPodStatus(testPod.UID) @@ -1231,7 +1233,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Fatalf("Pod status is the same, a reconciliation is not needed") } syncer.SetPodStatus(testPod, podStatus) - syncer.syncBatch() + syncer.syncBatch(true) verifyActions(t, syncer, []core.Action{}) // If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond), @@ -1246,7 +1248,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") } syncer.SetPodStatus(testPod, podStatus) - syncer.syncBatch() + syncer.syncBatch(true) verifyActions(t, syncer, []core.Action{}) t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") @@ -1256,7 +1258,7 @@ func TestReconcilePodStatus(t *testing.T) { t.Fatalf("Pod status is different, a reconciliation is needed") } syncer.SetPodStatus(testPod, changedPodStatus) - syncer.syncBatch() + syncer.syncBatch(true) verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) }