diff --git a/pkg/kubelet/status/status_manager_test.go b/pkg/kubelet/status/status_manager_test.go index 7a53194fa7e..cec413518a6 100644 --- a/pkg/kubelet/status/status_manager_test.go +++ b/pkg/kubelet/status/status_manager_test.go @@ -87,8 +87,10 @@ func getRandomPodStatus() v1.PodStatus { } } -func verifyActions(t *testing.T, kubeClient clientset.Interface, expectedActions []core.Action) { - actions := kubeClient.(*fake.Clientset).Actions() +func verifyActions(t *testing.T, manager *manager, expectedActions []core.Action) { + manager.consumeUpdates() + actions := manager.kubeClient.(*fake.Clientset).Actions() + defer manager.kubeClient.(*fake.Clientset).ClearActions() if len(actions) != len(expectedActions) { t.Fatalf("unexpected actions, got: %+v expected: %+v", actions, expectedActions) return @@ -104,26 +106,25 @@ func verifyActions(t *testing.T, kubeClient clientset.Interface, expectedActions func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) { // Consume all updates in the channel. - numUpdates := 0 - for { - hasUpdate := true - select { - case <-manager.podStatusChannel: - numUpdates++ - default: - hasUpdate = false - } - - if !hasUpdate { - break - } - } - + numUpdates := manager.consumeUpdates() if numUpdates != expectedUpdates { t.Errorf("unexpected number of updates %d, expected %d", numUpdates, expectedUpdates) } } +func (m *manager) consumeUpdates() int { + updates := 0 + for { + select { + case syncRequest := <-m.podStatusChannel: + m.syncPod(syncRequest.podUID, syncRequest.status) + updates++ + default: + return updates + } + } +} + func TestNewStatus(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() @@ -284,34 +285,25 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) { } } -func TestSyncBatchIgnoresNotFound(t *testing.T) { +func TestSyncPodIgnoresNotFound(t *testing.T) { client := fake.Clientset{} syncer := newTestManager(&client) client.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) { return true, nil, errors.NewNotFound(api.Resource("pods"), "test-pod") }) syncer.SetPodStatus(getTestPod(), getRandomPodStatus()) - syncer.testSyncBatch() - - verifyActions(t, syncer.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - }) + verifyActions(t, syncer, []core.Action{getAction()}) } -func TestSyncBatch(t *testing.T) { +func TestSyncPod(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() syncer.kubeClient = fake.NewSimpleClientset(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) - syncer.testSyncBatch() - verifyActions(t, syncer.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}, - }, - ) + verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) } -func TestSyncBatchChecksMismatchedUID(t *testing.T) { +func TestSyncPodChecksMismatchedUID(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) pod := getTestPod() pod.UID = "first" @@ -321,13 +313,10 @@ func TestSyncBatchChecksMismatchedUID(t *testing.T) { syncer.podManager.AddPod(differentPod) syncer.kubeClient = fake.NewSimpleClientset(pod) syncer.SetPodStatus(differentPod, getRandomPodStatus()) - syncer.testSyncBatch() - verifyActions(t, syncer.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - }) + verifyActions(t, syncer, []core.Action{getAction()}) } -func TestSyncBatchNoDeadlock(t *testing.T) { +func TestSyncPodNoDeadlock(t *testing.T) { client := &fake.Clientset{} m := newTestManager(client) pod := getTestPod() @@ -349,53 +338,38 @@ func TestSyncBatchNoDeadlock(t *testing.T) { pod.Status.ContainerStatuses = []v1.ContainerStatus{{State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}}} - getAction := core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}} - updateAction := core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}} - - // Pod not found. + t.Logf("Pod not found.") ret = *pod err = errors.NewNotFound(api.Resource("pods"), pod.Name) m.SetPodStatus(pod, getRandomPodStatus()) - m.testSyncBatch() - verifyActions(t, client, []core.Action{getAction}) - client.ClearActions() + verifyActions(t, m, []core.Action{getAction()}) - // Pod was recreated. + t.Logf("Pod was recreated.") ret.UID = "other_pod" err = nil m.SetPodStatus(pod, getRandomPodStatus()) - m.testSyncBatch() - verifyActions(t, client, []core.Action{getAction}) - client.ClearActions() + verifyActions(t, m, []core.Action{getAction()}) - // Pod not deleted (success case). + t.Logf("Pod not deleted (success case).") ret = *pod m.SetPodStatus(pod, getRandomPodStatus()) - m.testSyncBatch() - verifyActions(t, client, []core.Action{getAction, updateAction}) - client.ClearActions() + verifyActions(t, m, []core.Action{getAction(), updateAction()}) - // Pod is terminated, but still running. + t.Logf("Pod is terminated, but still running.") pod.DeletionTimestamp = new(metav1.Time) m.SetPodStatus(pod, getRandomPodStatus()) - m.testSyncBatch() - verifyActions(t, client, []core.Action{getAction, updateAction}) - client.ClearActions() + verifyActions(t, m, []core.Action{getAction(), updateAction()}) - // Pod is terminated successfully. + t.Logf("Pod is terminated successfully.") pod.Status.ContainerStatuses[0].State.Running = nil pod.Status.ContainerStatuses[0].State.Terminated = &v1.ContainerStateTerminated{} m.SetPodStatus(pod, getRandomPodStatus()) - m.testSyncBatch() - verifyActions(t, client, []core.Action{getAction, updateAction}) - client.ClearActions() + verifyActions(t, m, []core.Action{getAction(), updateAction()}) - // Error case. + t.Logf("Error case.") err = fmt.Errorf("intentional test error") m.SetPodStatus(pod, getRandomPodStatus()) - m.testSyncBatch() - verifyActions(t, client, []core.Action{getAction}) - client.ClearActions() + verifyActions(t, m, []core.Action{getAction()}) } func TestStaleUpdates(t *testing.T) { @@ -409,21 +383,13 @@ func TestStaleUpdates(t *testing.T) { m.SetPodStatus(pod, status) status.Message = "second version bump" m.SetPodStatus(pod, status) + + t.Logf("sync batch before syncPods pushes latest status, so we should see three statuses in the channel, but only one update") + m.syncBatch() verifyUpdates(t, m, 3) - - t.Logf("First sync pushes latest status.") - m.testSyncBatch() - verifyActions(t, m.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}, - }) - client.ClearActions() - - for i := 0; i < 2; i++ { - t.Logf("Next 2 syncs should be ignored (%d).", i) - m.testSyncBatch() - verifyActions(t, m.kubeClient, []core.Action{}) - } + verifyActions(t, m, []core.Action{getAction(), updateAction()}) + t.Logf("Nothing left in the channel to sync") + verifyActions(t, m, []core.Action{}) t.Log("Unchanged status should not send an update.") m.SetPodStatus(pod, status) @@ -433,13 +399,10 @@ func TestStaleUpdates(t *testing.T) { m.apiStatusVersions[pod.UID] = m.apiStatusVersions[pod.UID] - 1 m.SetPodStatus(pod, status) - m.testSyncBatch() - verifyActions(t, m.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}, - }) + m.syncBatch() + verifyActions(t, m, []core.Action{getAction(), updateAction()}) - // Nothing stuck in the pipe. + t.Logf("Nothing stuck in the pipe.") verifyUpdates(t, m, 0) } @@ -526,7 +489,7 @@ func TestStaticPod(t *testing.T) { client := fake.NewSimpleClientset(mirrorPod) m := newTestManager(client) - // Create the static pod + t.Logf("Create the static pod") m.podManager.AddPod(staticPod) assert.True(t, kubepod.IsStaticPod(staticPod), "SetUp error: staticPod") @@ -535,69 +498,57 @@ func TestStaticPod(t *testing.T) { status.StartTime = &now m.SetPodStatus(staticPod, status) - // Should be able to get the static pod status from status manager + t.Logf("Should be able to get the static pod status from status manager") retrievedStatus := expectPodStatus(t, m, staticPod) normalizeStatus(staticPod, &status) assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) - // Should not sync pod because there is no corresponding mirror pod for the static pod. - m.testSyncBatch() - verifyActions(t, m.kubeClient, []core.Action{}) - client.ClearActions() + t.Logf("Should not sync pod in syncBatch because there is no corresponding mirror pod for the static pod.") + m.syncBatch() + assert.Equal(t, len(m.kubeClient.(*fake.Clientset).Actions()), 0, "Expected no updates after syncBatch, got %+v", m.kubeClient.(*fake.Clientset).Actions()) - // Create the mirror pod + t.Logf("Create the mirror pod") m.podManager.AddPod(mirrorPod) assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID) - // Should be able to get the mirror pod status from status manager + t.Logf("Should be able to get the mirror pod status from status manager") retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) - // Should sync pod because the corresponding mirror pod is created - m.testSyncBatch() - verifyActions(t, m.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}, - }) - updateAction := client.Actions()[1].(core.UpdateActionImpl) - updatedPod := updateAction.Object.(*v1.Pod) - assert.Equal(t, mirrorPod.UID, updatedPod.UID, "Expected mirrorPod (%q), but got %q", mirrorPod.UID, updatedPod.UID) - assert.True(t, isStatusEqual(&status, &updatedPod.Status), "Expected: %+v, Got: %+v", status, updatedPod.Status) - client.ClearActions() + t.Logf("Should sync pod because the corresponding mirror pod is created") + verifyActions(t, m, []core.Action{getAction(), updateAction()}) - // Should not sync pod because nothing is changed. + t.Logf("syncBatch should not sync any pods because nothing is changed.") m.testSyncBatch() - verifyActions(t, m.kubeClient, []core.Action{}) + verifyActions(t, m, []core.Action{}) - // Change mirror pod identity. + t.Logf("Change mirror pod identity.") m.podManager.DeletePod(mirrorPod) mirrorPod.UID = "new-mirror-pod" mirrorPod.Status = v1.PodStatus{} m.podManager.AddPod(mirrorPod) - // Should not update to mirror pod, because UID has changed. - m.testSyncBatch() - verifyActions(t, m.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - }) + t.Logf("Should not update to mirror pod, because UID has changed.") + m.syncBatch() + verifyActions(t, m, []core.Action{getAction()}) } func TestTerminatePod(t *testing.T) { syncer := newTestManager(&fake.Clientset{}) testPod := getTestPod() - // update the pod's status to Failed. TerminatePod should preserve this status update. + t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.") firstStatus := getRandomPodStatus() firstStatus.Phase = v1.PodFailed syncer.SetPodStatus(testPod, firstStatus) - // set the testPod to a pod with Phase running, to simulate a stale pod + t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod") testPod.Status = getRandomPodStatus() testPod.Status.Phase = v1.PodRunning syncer.TerminatePod(testPod) - // we expect the container statuses to have changed to terminated + t.Logf("we expect the container statuses to have changed to terminated") newStatus := expectPodStatus(t, syncer, testPod) for i := range newStatus.ContainerStatuses { assert.False(t, newStatus.ContainerStatuses[i].State.Terminated == nil, "expected containers to be terminated") @@ -606,7 +557,7 @@ func TestTerminatePod(t *testing.T) { assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated") } - // we expect the previous status update to be preserved. + t.Logf("we expect the previous status update to be preserved.") assert.Equal(t, newStatus.Phase, firstStatus.Phase) assert.Equal(t, newStatus.Message, firstStatus.Message) } @@ -711,10 +662,10 @@ func TestSyncBatchCleanupVersions(t *testing.T) { kubetypes.ConfigMirrorAnnotationKey: "mirror", } - // Orphaned pods should be removed. + t.Logf("Orphaned pods should be removed.") m.apiStatusVersions[testPod.UID] = 100 m.apiStatusVersions[mirrorPod.UID] = 200 - m.testSyncBatch() + m.syncBatch() if _, ok := m.apiStatusVersions[testPod.UID]; ok { t.Errorf("Should have cleared status for testPod") } @@ -722,7 +673,7 @@ func TestSyncBatchCleanupVersions(t *testing.T) { t.Errorf("Should have cleared status for mirrorPod") } - // Non-orphaned pods should not be removed. + t.Logf("Non-orphaned pods should not be removed.") m.SetPodStatus(testPod, getRandomPodStatus()) m.podManager.AddPod(mirrorPod) staticPod := mirrorPod @@ -745,8 +696,9 @@ func TestReconcilePodStatus(t *testing.T) { client := fake.NewSimpleClientset(testPod) syncer := newTestManager(client) syncer.SetPodStatus(testPod, getRandomPodStatus()) - // Call syncBatch directly to test reconcile + t.Logf("Call syncBatch directly to test reconcile") syncer.syncBatch() // The apiStatusVersions should be set now + client.ClearActions() podStatus, ok := syncer.GetPodStatus(testPod.UID) if !ok { @@ -754,42 +706,36 @@ func TestReconcilePodStatus(t *testing.T) { } testPod.Status = podStatus - // If the pod status is the same, a reconciliation is not needed, - // syncBatch should do nothing + t.Logf("If the pod status is the same, a reconciliation is not needed and syncBatch should do nothing") syncer.podManager.UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { t.Errorf("Pod status is the same, a reconciliation is not needed") } - client.ClearActions() syncer.syncBatch() - verifyActions(t, client, []core.Action{}) + verifyActions(t, syncer, []core.Action{}) // If the pod status is the same, only the timestamp is in Rfc3339 format (lower precision without nanosecond), // a reconciliation is not needed, syncBatch should do nothing. // The StartTime should have been set in SetPodStatus(). // TODO(random-liu): Remove this later when api becomes consistent for timestamp. + t.Logf("Syncbatch should do nothing, as a reconciliation is not required") normalizedStartTime := testPod.Status.StartTime.Rfc3339Copy() testPod.Status.StartTime = &normalizedStartTime syncer.podManager.UpdatePod(testPod) if syncer.needsReconcile(testPod.UID, podStatus) { t.Errorf("Pod status only differs for timestamp format, a reconciliation is not needed") } - client.ClearActions() syncer.syncBatch() - verifyActions(t, client, []core.Action{}) + verifyActions(t, syncer, []core.Action{}) - // If the pod status is different, a reconciliation is needed, syncBatch should trigger an update + t.Logf("If the pod status is different, a reconciliation is needed, syncBatch should trigger an update") testPod.Status = getRandomPodStatus() syncer.podManager.UpdatePod(testPod) if !syncer.needsReconcile(testPod.UID, podStatus) { t.Errorf("Pod status is different, a reconciliation is needed") } - client.ClearActions() syncer.syncBatch() - verifyActions(t, client, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}, - }) + verifyActions(t, syncer, []core.Action{getAction(), updateAction()}) } func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { @@ -802,7 +748,7 @@ func expectPodStatus(t *testing.T, m *manager, pod *v1.Pod) v1.PodStatus { func TestDeletePods(t *testing.T) { pod := getTestPod() - // Set the deletion timestamp. + t.Logf("Set the deletion timestamp.") pod.DeletionTimestamp = new(metav1.Time) client := fake.NewSimpleClientset(pod) m := newTestManager(client) @@ -813,13 +759,8 @@ func TestDeletePods(t *testing.T) { status.StartTime = &now m.SetPodStatus(pod, status) - m.testSyncBatch() - // Expect to see an delete action. - verifyActions(t, m.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}, - core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - }) + t.Logf("Expect to see a delete action.") + verifyActions(t, m, []core.Action{getAction(), updateAction(), deleteAction()}) } func TestDoNotDeleteMirrorPods(t *testing.T) { @@ -831,13 +772,13 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { kubetypes.ConfigSourceAnnotationKey: "api", kubetypes.ConfigMirrorAnnotationKey: "mirror", } - // Set the deletion timestamp. + t.Logf("Set the deletion timestamp.") mirrorPod.DeletionTimestamp = new(metav1.Time) client := fake.NewSimpleClientset(mirrorPod) m := newTestManager(client) m.podManager.AddPod(staticPod) m.podManager.AddPod(mirrorPod) - // Verify setup. + t.Logf("Verify setup.") assert.True(t, kubepod.IsStaticPod(staticPod), "SetUp error: staticPod") assert.True(t, kubepod.IsMirrorPod(mirrorPod), "SetUp error: mirrorPod") assert.Equal(t, m.podManager.TranslatePodUID(mirrorPod.UID), staticPod.UID) @@ -847,10 +788,18 @@ func TestDoNotDeleteMirrorPods(t *testing.T) { status.StartTime = &now m.SetPodStatus(staticPod, status) - m.testSyncBatch() - // Expect not to see an delete action. - verifyActions(t, m.kubeClient, []core.Action{ - core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}}, - core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}}, - }) + t.Logf("Expect not to see a delete action.") + verifyActions(t, m, []core.Action{getAction(), updateAction()}) +} + +func getAction() core.GetAction { + return core.GetActionImpl{ActionImpl: core.ActionImpl{Verb: "get", Resource: schema.GroupVersionResource{Resource: "pods"}}} +} + +func updateAction() core.UpdateAction { + return core.UpdateActionImpl{ActionImpl: core.ActionImpl{Verb: "update", Resource: schema.GroupVersionResource{Resource: "pods"}, Subresource: "status"}} +} + +func deleteAction() core.DeleteAction { + return core.DeleteActionImpl{ActionImpl: core.ActionImpl{Verb: "delete", Resource: schema.GroupVersionResource{Resource: "pods"}}} }