kubelet: Remove status manager channel

The status manager channel forces all container status to be
processed, even if multiple updates are generated in succession.
Instead of queueing the updates, just remember which ones changed
and process them in a batch. This should reduce QPS load from
the Kubelet for status, reduce latency of status propagation to
the API in general, and is easier to reason about.

This also prevents status from being lost when the channel is
full - all updates sent by SetPodStatus are guaranteed to be
recorded. Changing to remove the channel allows us to set a
marker flag when the pod worker state machine completes that
avoids the status manager having to call into the pod worker
directly.
This commit is contained in:
Clayton Coleman 2023-03-14 15:00:07 -06:00 committed by Michal Wozniak
parent a34e37c996
commit 58d1dc669f
2 changed files with 83 additions and 80 deletions

View File

@ -63,11 +63,6 @@ type versionedPodStatus struct {
status v1.PodStatus status v1.PodStatus
} }
type podStatusSyncRequest struct {
podUID types.UID
status versionedPodStatus
}
// Updates pod statuses in apiserver. Writes only when new status has changed. // Updates pod statuses in apiserver. Writes only when new status has changed.
// All methods are thread-safe. // All methods are thread-safe.
type manager struct { type manager struct {
@ -76,7 +71,7 @@ type manager struct {
// Map from pod UID to sync status of the corresponding pod. // Map from pod UID to sync status of the corresponding pod.
podStatuses map[types.UID]versionedPodStatus podStatuses map[types.UID]versionedPodStatus
podStatusesLock sync.RWMutex podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest podStatusChannel chan struct{}
// Map from (mirror) pod UID to latest status version successfully sent to the API server. // Map from (mirror) pod UID to latest status version successfully sent to the API server.
// apiStatusVersions must only be accessed from the sync thread. // apiStatusVersions must only be accessed from the sync thread.
apiStatusVersions map[kubetypes.MirrorPodUID]uint64 apiStatusVersions map[kubetypes.MirrorPodUID]uint64
@ -158,7 +153,7 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podD
kubeClient: kubeClient, kubeClient: kubeClient,
podManager: podManager, podManager: podManager,
podStatuses: make(map[types.UID]versionedPodStatus), podStatuses: make(map[types.UID]versionedPodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses podStatusChannel: make(chan struct{}, 1),
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety, podDeletionSafety: podDeletionSafety,
podStartupLatencyHelper: podStartupLatencyHelper, podStartupLatencyHelper: podStartupLatencyHelper,
@ -217,19 +212,12 @@ func (m *manager) Start() {
go wait.Forever(func() { go wait.Forever(func() {
for { for {
select { select {
case syncRequest := <-m.podStatusChannel: case <-m.podStatusChannel:
klog.V(5).InfoS("Status Manager: syncing pod with status from podStatusChannel", klog.V(4).InfoS("Syncing updated statuses")
"podUID", syncRequest.podUID, m.syncBatch(false)
"statusVersion", syncRequest.status.version,
"status", syncRequest.status.status)
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker: case <-syncTicker:
klog.V(5).InfoS("Status Manager: syncing batch") klog.V(4).InfoS("Syncing all statuses")
// remove any entries in the status channel since the batch will handle them m.syncBatch(true)
for i := len(m.podStatusChannel); i > 0; i-- {
<-m.podStatusChannel
}
m.syncBatch()
} }
} }
}, 0) }, 0)
@ -540,9 +528,9 @@ func checkContainerStateTransition(oldStatuses, newStatuses []v1.ContainerStatus
} }
// updateStatusInternal updates the internal status cache, and queues an update to the api server if // updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary. Returns whether an update was triggered. // necessary.
// This method IS NOT THREAD SAFE and must be called from a locked function. // This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool { func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) {
var oldStatus v1.PodStatus var oldStatus v1.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID] cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached { if isCached {
@ -556,11 +544,11 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// Check for illegal state transition in containers // Check for illegal state transition in containers
if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil { if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
return false return
} }
if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil { if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod)) klog.ErrorS(err, "Status update on pod aborted", "pod", klog.KObj(pod))
return false return
} }
// Set ContainersReadyCondition.LastTransitionTime. // Set ContainersReadyCondition.LastTransitionTime.
@ -630,7 +618,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
// clobbering each other so the phase of a pod progresses monotonically. // clobbering each other so the phase of a pod progresses monotonically.
if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate { if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status) klog.V(3).InfoS("Ignoring same status for pod", "pod", klog.KObj(pod), "status", status)
return false // No new status. return
} }
newStatus := versionedPodStatus{ newStatus := versionedPodStatus{
@ -652,20 +640,9 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
m.podStatuses[pod.UID] = newStatus m.podStatuses[pod.UID] = newStatus
select { select {
case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: case m.podStatusChannel <- struct{}{}:
klog.V(5).InfoS("Status Manager: adding pod with new status to podStatusChannel",
"pod", klog.KObj(pod),
"podUID", pod.UID,
"statusVersion", newStatus.version,
"status", newStatus.status)
return true
default: default:
// Let the periodic syncBatch handle the update if the channel is full. // there's already a status update pending
// We can't block, since we hold the mutex lock.
klog.V(4).InfoS("Skipping the status update for pod for now because the channel is full",
"pod", klog.KObj(pod),
"status", status)
return false
} }
} }
@ -710,25 +687,38 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
} }
} }
// syncBatch syncs pods statuses with the apiserver. // syncBatch syncs pods statuses with the apiserver. Returns the number of syncs
func (m *manager) syncBatch() { // attempted for testing.
var updatedStatuses []podStatusSyncRequest func (m *manager) syncBatch(all bool) int {
type podSync struct {
podUID types.UID
statusUID kubetypes.MirrorPodUID
status versionedPodStatus
}
var updatedStatuses []podSync
podToMirror, mirrorToPod := m.podManager.GetUIDTranslations() podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
func() { // Critical section func() { // Critical section
m.podStatusesLock.RLock() m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock() defer m.podStatusesLock.RUnlock()
// Clean up orphaned versions. // Clean up orphaned versions.
for uid := range m.apiStatusVersions { if all {
_, hasPod := m.podStatuses[types.UID(uid)] for uid := range m.apiStatusVersions {
_, hasMirror := mirrorToPod[uid] _, hasPod := m.podStatuses[types.UID(uid)]
if !hasPod && !hasMirror { _, hasMirror := mirrorToPod[uid]
delete(m.apiStatusVersions, uid) if !hasPod && !hasMirror {
delete(m.apiStatusVersions, uid)
}
} }
} }
// Decide which pods need status updates.
for uid, status := range m.podStatuses { for uid, status := range m.podStatuses {
syncedUID := kubetypes.MirrorPodUID(uid) // translate the pod UID (source) to the status UID (API pod) -
// static pods are identified in source by pod UID but tracked in the
// API via the uid of the mirror pod
uidOfStatus := kubetypes.MirrorPodUID(uid)
if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok { if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
if mirrorUID == "" { if mirrorUID == "" {
klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping", klog.V(5).InfoS("Static pod does not have a corresponding mirror pod; skipping",
@ -736,34 +726,45 @@ func (m *manager) syncBatch() {
"pod", klog.KRef(status.podNamespace, status.podName)) "pod", klog.KRef(status.podNamespace, status.podName))
continue continue
} }
syncedUID = mirrorUID uidOfStatus = mirrorUID
} }
if m.needsUpdate(types.UID(syncedUID), status) {
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) // if a new status update has been delivered, trigger an update, otherwise the
// pod can wait for the next bulk check (which performs reconciliation as well)
if !all {
if m.apiStatusVersions[uidOfStatus] >= status.version {
continue
}
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
continue
}
// Ensure that any new status, or mismatched status, or pod that is ready for
// deletion gets updated. If a status update fails we retry the next time any
// other pod is updated.
if m.needsUpdate(types.UID(uidOfStatus), status) {
updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
} else if m.needsReconcile(uid, status.status) { } else if m.needsReconcile(uid, status.status) {
// Delete the apiStatusVersions here to force an update on the pod status // Delete the apiStatusVersions here to force an update on the pod status
// In most cases the deleted apiStatusVersions here should be filled // In most cases the deleted apiStatusVersions here should be filled
// soon after the following syncPod() [If the syncPod() sync an update // soon after the following syncPod() [If the syncPod() sync an update
// successfully]. // successfully].
delete(m.apiStatusVersions, syncedUID) delete(m.apiStatusVersions, uidOfStatus)
updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status}) updatedStatuses = append(updatedStatuses, podSync{uid, uidOfStatus, status})
} }
} }
}() }()
for _, update := range updatedStatuses { for _, update := range updatedStatuses {
klog.V(5).InfoS("Status Manager: syncPod in syncbatch", "podUID", update.podUID) klog.V(5).InfoS("Sync pod status", "podUID", update.podUID, "statusUID", update.statusUID, "version", update.status.version)
m.syncPod(update.podUID, update.status) m.syncPod(update.podUID, update.status)
} }
return len(updatedStatuses)
} }
// syncPod syncs the given status with the API server. The caller must not hold the lock. // syncPod syncs the given status with the API server. The caller must not hold the status lock.
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if !m.needsUpdate(uid, status) {
klog.V(1).InfoS("Status for pod is up-to-date; skipping", "podUID", uid)
return
}
// TODO: make me easier to express from client code // TODO: make me easier to express from client code
pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{}) pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
@ -815,7 +816,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
if status.at.IsZero() { if status.at.IsZero() {
klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version)
} else { } else {
duration := time.Now().Sub(status.at).Truncate(time.Millisecond) duration := time.Since(status.at).Truncate(time.Millisecond)
metrics.PodStatusSyncDuration.Observe(duration.Seconds()) metrics.PodStatusSyncDuration.Observe(duration.Seconds())
} }

View File

@ -69,7 +69,7 @@ func getTestPod() *v1.Pod {
// After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation // After adding reconciliation, if status in pod manager is different from the cached status, a reconciliation
// will be triggered, which will mess up all the old unit test. // will be triggered, which will mess up all the old unit test.
// To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in // To simplify the implementation of unit test, we add testSyncBatch() here, it will make sure the statuses in
// pod manager the same with cached ones before syncBatch() so as to avoid reconciling. // pod manager the same with cached ones before syncBatch(true) so as to avoid reconciling.
func (m *manager) testSyncBatch() { func (m *manager) testSyncBatch() {
for uid, status := range m.podStatuses { for uid, status := range m.podStatuses {
pod, ok := m.podManager.GetPodByUID(uid) pod, ok := m.podManager.GetPodByUID(uid)
@ -81,7 +81,7 @@ func (m *manager) testSyncBatch() {
pod.Status = status.status pod.Status = status.status
} }
} }
m.syncBatch() m.syncBatch(true)
} }
func newTestManager(kubeClient clientset.Interface) *manager { func newTestManager(kubeClient clientset.Interface) *manager {
@ -113,19 +113,19 @@ func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action
actions := manager.kubeClient.(*fake.Clientset).Actions() actions := manager.kubeClient.(*fake.Clientset).Actions()
defer manager.kubeClient.(*fake.Clientset).ClearActions() defer manager.kubeClient.(*fake.Clientset).ClearActions()
if len(actions) != len(expectedActions) { if len(actions) != len(expectedActions) {
t.Fatalf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) t.Fatalf("unexpected actions: %s", cmp.Diff(expectedActions, actions))
return
} }
for i := 0; i < len(actions); i++ { for i := 0; i < len(actions); i++ {
e := expectedActions[i] e := expectedActions[i]
a := actions[i] a := actions[i]
if !a.Matches(e.GetVerb(), e.GetResource().Resource) || a.GetSubresource() != e.GetSubresource() { if !a.Matches(e.GetVerb(), e.GetResource().Resource) || a.GetSubresource() != e.GetSubresource() {
t.Errorf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) t.Errorf("unexpected actions: %s", cmp.Diff(expectedActions, actions))
} }
} }
} }
func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) {
t.Helper()
// Consume all updates in the channel. // Consume all updates in the channel.
numUpdates := manager.consumeUpdates() numUpdates := manager.consumeUpdates()
if numUpdates != expectedUpdates { if numUpdates != expectedUpdates {
@ -137,9 +137,8 @@ func (m *manager) consumeUpdates() int {
updates := 0 updates := 0
for { for {
select { select {
case syncRequest := <-m.podStatusChannel: case <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status) updates += m.syncBatch(false)
updates++
default: default:
return updates return updates
} }
@ -214,8 +213,9 @@ func TestChangedStatus(t *testing.T) {
syncer := newTestManager(&fake.Clientset{}) syncer := newTestManager(&fake.Clientset{})
testPod := getTestPod() testPod := getTestPod()
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2) verifyUpdates(t, syncer, 1)
} }
func TestChangedStatusKeepsStartTime(t *testing.T) { func TestChangedStatusKeepsStartTime(t *testing.T) {
@ -225,8 +225,9 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
firstStatus := getRandomPodStatus() firstStatus := getRandomPodStatus()
firstStatus.StartTime = &now firstStatus.StartTime = &now
syncer.SetPodStatus(testPod, firstStatus) syncer.SetPodStatus(testPod, firstStatus)
verifyUpdates(t, syncer, 1)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2) verifyUpdates(t, syncer, 1)
finalStatus := expectPodStatus(t, syncer, testPod) finalStatus := expectPodStatus(t, syncer, testPod)
if finalStatus.StartTime.IsZero() { if finalStatus.StartTime.IsZero() {
t.Errorf("StartTime should not be zero") t.Errorf("StartTime should not be zero")
@ -407,9 +408,9 @@ func TestStaleUpdates(t *testing.T) {
status.Message = "second version bump" status.Message = "second version bump"
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") t.Logf("sync batch before syncPods pushes latest status, resulting in one update during the batch")
m.syncBatch() m.syncBatch(true)
verifyUpdates(t, m, 3) verifyUpdates(t, m, 0)
verifyActions(t, m, []core.Action{getAction(), patchAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("Nothing left in the channel to sync") t.Logf("Nothing left in the channel to sync")
verifyActions(t, m, []core.Action{}) verifyActions(t, m, []core.Action{})
@ -423,7 +424,7 @@ func TestStaleUpdates(t *testing.T) {
m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1 m.apiStatusVersions[mirrorPodUID] = m.apiStatusVersions[mirrorPodUID] - 1
m.SetPodStatus(pod, status) m.SetPodStatus(pod, status)
m.syncBatch() m.syncBatch(true)
verifyActions(t, m, []core.Action{getAction()}) verifyActions(t, m, []core.Action{getAction()})
t.Logf("Nothing stuck in the pipe.") t.Logf("Nothing stuck in the pipe.")
@ -545,7 +546,7 @@ func TestStaticPod(t *testing.T) {
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.")
m.syncBatch() m.syncBatch(true)
assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions()) assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions())
t.Logf("Create the mirror pod") t.Logf("Create the mirror pod")
@ -558,6 +559,7 @@ func TestStaticPod(t *testing.T) {
assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) assert.True(t, isPodStatusByKubeletEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
t.Logf("Should sync pod because the corresponding mirror pod is created") t.Logf("Should sync pod because the corresponding mirror pod is created")
assert.Equal(t, m.syncBatch(true), 1)
verifyActions(t, m, []core.Action{getAction(), patchAction()}) verifyActions(t, m, []core.Action{getAction(), patchAction()})
t.Logf("syncBatch should not sync any pods because nothing is changed.") t.Logf("syncBatch should not sync any pods because nothing is changed.")
@ -571,7 +573,7 @@ func TestStaticPod(t *testing.T) {
m.podManager.AddPod(mirrorPod) m.podManager.AddPod(mirrorPod)
t.Logf("Should not update to mirror pod, because UID has changed.") t.Logf("Should not update to mirror pod, because UID has changed.")
m.syncBatch() assert.Equal(t, m.syncBatch(true), 1)
verifyActions(t, m, []core.Action{getAction()}) verifyActions(t, m, []core.Action{getAction()})
} }
@ -1184,7 +1186,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
t.Logf("Orphaned pods should be removed.") t.Logf("Orphaned pods should be removed.")
m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100 m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)] = 100
m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200 m.apiStatusVersions[kubetypes.MirrorPodUID(mirrorPod.UID)] = 200
m.syncBatch() m.syncBatch(true)
if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok { if _, ok := m.apiStatusVersions[kubetypes.MirrorPodUID(testPod.UID)]; ok {
t.Errorf("Should have cleared status for testPod") t.Errorf("Should have cleared status for testPod")
} }
@ -1216,7 +1218,7 @@ func TestReconcilePodStatus(t *testing.T) {
syncer := newTestManager(client) syncer := newTestManager(client)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
t.Logf("Call syncBatch directly to test reconcile") t.Logf("Call syncBatch directly to test reconcile")
syncer.syncBatch() // The apiStatusVersions should be set now syncer.syncBatch(true) // The apiStatusVersions should be set now
client.ClearActions() client.ClearActions()
podStatus, ok := syncer.GetPodStatus(testPod.UID) podStatus, ok := syncer.GetPodStatus(testPod.UID)
@ -1231,7 +1233,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Fatalf("Pod status is the same, a reconciliation is not needed") t.Fatalf("Pod status is the same, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch(true)
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
// If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond), // If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond),
@ -1246,7 +1248,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed") t.Fatalf("Pod status only differs for timestamp format, a reconciliation is not needed")
} }
syncer.SetPodStatus(testPod, podStatus) syncer.SetPodStatus(testPod, podStatus)
syncer.syncBatch() syncer.syncBatch(true)
verifyActions(t, syncer, []core.Action{}) verifyActions(t, syncer, []core.Action{})
t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update")
@ -1256,7 +1258,7 @@ func TestReconcilePodStatus(t *testing.T) {
t.Fatalf("Pod status is different, a reconciliation is needed") t.Fatalf("Pod status is different, a reconciliation is needed")
} }
syncer.SetPodStatus(testPod, changedPodStatus) syncer.SetPodStatus(testPod, changedPodStatus)
syncer.syncBatch() syncer.syncBatch(true)
verifyActions(t, syncer, []core.Action{getAction(), patchAction()}) verifyActions(t, syncer, []core.Action{getAction(), patchAction()})
} }