diff --git a/pkg/kubelet/fake_pod_workers.go b/pkg/kubelet/fake_pod_workers.go new file mode 100644 index 00000000000..687f1e5a5be --- /dev/null +++ b/pkg/kubelet/fake_pod_workers.go @@ -0,0 +1,45 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" +) + +// fakePodWorkers runs sync pod function in serial, so we can have +// deterministic behaviour in testing. +type fakePodWorkers struct { + syncPodFn syncPodFnType + runtimeCache kubecontainer.RuntimeCache + t *testing.T +} + +func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) { + pods, err := f.runtimeCache.GetPods() + if err != nil { + f.t.Errorf("Unexpected error: %v", err) + } + if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID)); err != nil { + f.t.Errorf("Unexpected error: %v", err) + } +} + +func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b2d65585003..1c23b291101 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -332,7 +332,7 @@ type Kubelet struct { runtimeCache kubecontainer.RuntimeCache kubeClient client.Interface rootDirectory string - podWorkers *podWorkers + podWorkers PodWorkers resyncInterval time.Duration sourcesReady SourcesReadyFn diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f587301372c..229cf74c288 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -30,7 +30,6 @@ import ( "sort" "strconv" "strings" - "sync" "testing" "time" @@ -64,7 +63,6 @@ type TestKubelet struct { fakeDocker *dockertools.FakeDockerClient fakeCadvisor *cadvisor.Mock fakeKubeClient *testclient.Fake - waitGroup *sync.WaitGroup fakeMirrorClient *fakeMirrorClient } @@ -91,7 +89,6 @@ func newTestKubelet(t *testing.T) *TestKubelet { if err := os.MkdirAll(kubelet.rootDirectory, 0750); err != nil { t.Fatalf("can't mkdir(%q): %v", kubelet.rootDirectory, err) } - waitGroup := new(sync.WaitGroup) kubelet.sourcesReady = func() bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} @@ -111,16 +108,13 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.containerRuntime = dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin, kubelet, &fakeHTTP{}, runtimeHooks) kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerRuntime) - kubelet.podWorkers = newPodWorkers( - kubelet.runtimeCache, - func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error { - err := kubelet.syncPod(pod, mirrorPod, runningPod) - waitGroup.Done() - return err - }, - fakeRecorder) + kubelet.podWorkers = &fakePodWorkers{ + syncPodFn: kubelet.syncPod, + runtimeCache: kubelet.runtimeCache, + t: t, + } kubelet.volumeManager = newVolumeManager() - return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} + return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, fakeMirrorClient} } func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) { @@ -373,7 +367,6 @@ func TestSyncPodsDoesNothing(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup container := api.Container{Name: "bar"} pods := []*api.Pod{ @@ -417,20 +410,21 @@ func TestSyncPodsDoesNothing(t *testing.T) { } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", // Check the pod infra contianer. "inspect_container", // Get pod status. - "list", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) } func TestSyncPodsWithTerminationLog(t *testing.T) { @@ -438,7 +432,6 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup container := api.Container{ Name: "bar", TerminationMessagePath: "/dev/somepath", @@ -459,14 +452,12 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_image", // Create pod infra container. @@ -474,7 +465,10 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { // Create container. "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) fakeDocker.Lock() parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") @@ -500,7 +494,6 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup // TODO: Move this test to dockertools so that we don't have to do the hacky // type assertion here. dm := kubelet.containerRuntime.(*dockertools.DockerManager) @@ -521,15 +514,13 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_image", // Create pod infra container. @@ -537,7 +528,10 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { // Create container. "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) fakeDocker.Lock() @@ -564,7 +558,6 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup // TODO: Move this test to dockertools so that we don't have to do the hacky // type assertion here. dm := kubelet.containerRuntime.(*dockertools.DockerManager) @@ -586,16 +579,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { }, }, } - waitGroup.Add(1) kubelet.podManager.SetPods(pods) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_image", // Create pod infra container. @@ -603,7 +594,10 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { // Create container. "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) fakeDocker.Lock() @@ -624,7 +618,6 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup pods := []*api.Pod{ { ObjectMeta: api.ObjectMeta{ @@ -653,16 +646,14 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { HostConfig: &docker.HostConfig{}, }, } - waitGroup.Add(1) kubelet.podManager.SetPods(pods) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. @@ -670,7 +661,10 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { // Create container. "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -695,7 +689,6 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup fakeHttp := fakeHTTP{} // Simulate HTTP failure. Re-create the containerRuntime to inject the failure. @@ -742,16 +735,14 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { HostConfig: &docker.HostConfig{}, }, } - waitGroup.Add(1) kubelet.podManager.SetPods(pods) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. @@ -759,7 +750,10 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { // Create container. "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -777,7 +771,6 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup pods := []*api.Pod{ { @@ -840,19 +833,17 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, } - waitGroup.Add(2) kubelet.podManager.SetPods(pods) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() - verifyUnorderedCalls(t, fakeDocker, []string{ + verifyCalls(t, fakeDocker, []string{ "list", + // foo1 "list", - "list", // Get pod status. "list", "inspect_container", // Kill the container since pod infra container is not running. @@ -866,12 +857,16 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { // foo2 "list", + // Get pod status. + "list", "inspect_container", "inspect_container", // Check the pod infra container. "inspect_container", // Get pod status. "list", "inspect_container", "inspect_container", - // Get pod status. - "list", "inspect_container", "inspect_container"}) + + // Get pods for deleting orphaned volumes. + "list", + }) // A map iteration is used to delete containers, so must not depend on // order here. @@ -977,7 +972,6 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup pods := []*api.Pod{ { @@ -1030,15 +1024,13 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) { } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container. @@ -1046,7 +1038,10 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) { // Kill the duplicated container. "stop", // Get pod status. - "list", "inspect_container", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) // Expect one of the duplicates to be killed. if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") { t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) @@ -1058,7 +1053,6 @@ func TestSyncPodsBadHash(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup pods := []*api.Pod{ { @@ -1101,15 +1095,13 @@ func TestSyncPodsBadHash(t *testing.T) { } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", // Check the pod infra container. @@ -1117,7 +1109,10 @@ func TestSyncPodsBadHash(t *testing.T) { // Kill and restart the bad hash container. "stop", "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { t.Errorf("%v", err) @@ -1129,7 +1124,6 @@ func TestSyncPodsUnhealthy(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup pods := []*api.Pod{ { @@ -1175,15 +1169,13 @@ func TestSyncPodsUnhealthy(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", // Check the pod infra container. @@ -1193,7 +1185,10 @@ func TestSyncPodsUnhealthy(t *testing.T) { // Restart the unhealthy container. "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { t.Errorf("%v", err) @@ -1646,7 +1641,6 @@ func TestSyncPodEventHandlerFails(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup // Simulate HTTP failure. Re-create the containerRuntime to inject the failure. kubelet.httpClient = &fakeHTTP{ @@ -1695,15 +1689,13 @@ func TestSyncPodEventHandlerFails(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", + "list", "list", // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. @@ -1713,7 +1705,10 @@ func TestSyncPodEventHandlerFails(t *testing.T) { // Kill the container since event handler fails. "stop", // Get pod status. - "list", "inspect_container", "inspect_container"}) + "list", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }) // TODO(yifan): Check the stopped container's name. if len(fakeDocker.Stopped) != 1 { @@ -1733,7 +1728,6 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup // TODO: Move this test to dockertools so that we don't have to do the hacky // type assertion here. dm := kubelet.containerRuntime.(*dockertools.DockerManager) @@ -1761,12 +1755,10 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() fakeDocker.Lock() @@ -3662,7 +3654,6 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { testKubelet := newTestKubelet(t) testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet - waitGroup := testKubelet.waitGroup pods := []*api.Pod{ { @@ -3682,12 +3673,10 @@ func TestDoNotCacheStatusForStaticPods(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() podFullName := kubecontainer.GetPodFullName(pods[0]) status, ok := kubelet.statusManager.GetPodStatus(podFullName) if ok { @@ -3812,7 +3801,6 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup containers := []api.Container{ {Name: "succeeded"}, @@ -3891,7 +3879,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { }{ { api.RestartPolicyAlways, - []string{"list", "list", "list", + []string{"list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container. @@ -3899,13 +3887,16 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { // Restart both containers. "create", "start", "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container"}, + "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }, []string{"succeeded", "failed"}, []string{}, }, { api.RestartPolicyOnFailure, - []string{"list", "list", "list", + []string{"list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container. @@ -3913,13 +3904,16 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { // Restart the failed container. "create", "start", // Get pod status. - "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container"}, + "list", "inspect_container", "inspect_container", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }, []string{"failed"}, []string{}, }, { api.RestartPolicyNever, - []string{"list", "list", "list", + []string{"list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container. @@ -3927,7 +3921,10 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { // Stop the last pod infra container. "stop", // Get pod status. - "list", "inspect_container", "inspect_container", "inspect_container"}, + "list", "inspect_container", "inspect_container", "inspect_container", + // Get pods for deleting orphaned volumes. + "list", + }, []string{}, []string{"9876"}, }, @@ -3941,12 +3938,10 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { pods[0].Spec.RestartPolicy = tt.policy kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("%d: unexpected error: %v", i, err) } - waitGroup.Wait() // 'stop' is because the pod infra container is killed when no container is running. verifyCalls(t, fakeDocker, tt.calls) @@ -3965,7 +3960,6 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup containers := []api.Container{ {Name: "succeeded"}, @@ -4072,12 +4066,10 @@ func TestGetPodStatusWithLastTermination(t *testing.T) { }, } kubelet.podManager.SetPods(pods) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("%d: unexpected error: %v", i, err) } - waitGroup.Wait() // Check if we can retrieve the pod status from GetPodStatus(). podName := kubecontainer.GetPodFullName(pods[0]) @@ -4112,7 +4104,6 @@ func TestGetPodCreationFailureReason(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup // Inject the creation failure error to docker. failureReason := "creation failure" @@ -4150,12 +4141,10 @@ func TestGetPodCreationFailureReason(t *testing.T) { pods := []*api.Pod{pod} kubelet.podManager.SetPods(pods) kubelet.volumeManager.SetVolumes(pod.UID, kubecontainer.VolumeMap{}) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() status, err := kubelet.GetPodStatus(kubecontainer.GetPodFullName(pod)) if err != nil { @@ -4178,7 +4167,6 @@ func TestGetPodPullImageFailureReason(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker - waitGroup := testKubelet.waitGroup // Initialize the FakeDockerPuller so that it'd try to pull non-existent // images. @@ -4219,12 +4207,10 @@ func TestGetPodPullImageFailureReason(t *testing.T) { pods := []*api.Pod{pod} kubelet.podManager.SetPods(pods) kubelet.volumeManager.SetVolumes(pod.UID, kubecontainer.VolumeMap{}) - waitGroup.Add(1) err := kubelet.SyncPods(pods, emptyPodUIDs, map[string]*api.Pod{}, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } - waitGroup.Wait() status, err := kubelet.GetPodStatus(kubecontainer.GetPodFullName(pod)) if err != nil { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index aa842316cee..c88286878e6 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -28,6 +28,12 @@ import ( "github.com/golang/glog" ) +// PodWorkers is an abstract interface for testability. +type PodWorkers interface { + UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateComplete func()) + ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) +} + type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error type podWorkers struct { diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index cebc475ea6b..6d61b9cccdc 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -17,6 +17,8 @@ limitations under the License. package kubelet import ( + "reflect" + "sort" "sync" "testing" "time" @@ -27,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/fsouza/go-dockerclient" ) func newPod(uid, name string) *api.Pod { @@ -147,3 +150,227 @@ func TestForgetNonExistingPodWorkers(t *testing.T) { t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) } } + +type simpleFakeKubelet struct { + pod *api.Pod + mirrorPod *api.Pod + runningPod kubecontainer.Pod + + wg sync.WaitGroup +} + +func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { + kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod + return nil +} + +func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { + kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod + kl.wg.Done() + return nil +} + +// byContainerName sort the containers in a running pod by their names. +type byContainerName kubecontainer.Pod + +func (b byContainerName) Len() int { return len(b.Containers) } + +func (b byContainerName) Swap(i, j int) { + b.Containers[i], b.Containers[j] = b.Containers[j], b.Containers[i] +} + +func (b byContainerName) Less(i, j int) bool { + return b.Containers[i].Name < b.Containers[j].Name +} + +// TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers +// for their invocation of the syncPodFn. +func TestFakePodWorkers(t *testing.T) { + // Create components for pod workers. + fakeDocker := &dockertools.FakeDockerClient{} + fakeRecorder := &record.FakeRecorder{} + np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) + dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder)) + fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) + + kubeletForRealWorkers := &simpleFakeKubelet{} + kubeletForFakeWorkers := &simpleFakeKubelet{} + + realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder) + fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t} + + tests := []struct { + pod *api.Pod + mirrorPod *api.Pod + containerList []docker.APIContainers + containersInRunningPod int + }{ + { + &api.Pod{}, + &api.Pod{}, + []docker.APIContainers{}, + 0, + }, + + { + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "fooContainer", + }, + }, + }, + }, + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "fooMirror", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "fooContainerMirror", + }, + }, + }, + }, + []docker.APIContainers{ + { + // format is // k8s____ + Names: []string{"/k8s_bar.hash123_foo_new_12345678_0"}, + ID: "1234", + }, + { + // pod infra container + Names: []string{"/k8s_POD.hash123_foo_new_12345678_0"}, + ID: "9876", + }, + }, + 2, + }, + + { + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "98765", + Name: "bar", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "fooContainer", + }, + }, + }, + }, + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "98765", + Name: "fooMirror", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "fooContainerMirror", + }, + }, + }, + }, + []docker.APIContainers{ + { + // format is // k8s____ + Names: []string{"/k8s_bar.hash123_bar_new_98765_0"}, + ID: "1234", + }, + { + // pod infra container + Names: []string{"/k8s_POD.hash123_foo_new_12345678_0"}, + ID: "9876", + }, + }, + 1, + }, + + // Empty running pod. + { + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "98765", + Name: "baz", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "bazContainer", + }, + }, + }, + }, + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "98765", + Name: "bazMirror", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "bazContainerMirror", + }, + }, + }, + }, + []docker.APIContainers{ + { + // format is // k8s____ + Names: []string{"/k8s_bar.hash123_bar_new_12345678_0"}, + ID: "1234", + }, + { + // pod infra container + Names: []string{"/k8s_POD.hash123_foo_new_12345678_0"}, + ID: "9876", + }, + }, + 0, + }, + } + + for i, tt := range tests { + kubeletForRealWorkers.wg.Add(1) + + fakeDocker.ContainerList = tt.containerList + realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {}) + fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, func() {}) + + kubeletForRealWorkers.wg.Wait() + + if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) { + t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) + } + + if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) { + t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) + } + + if tt.containersInRunningPod != len(kubeletForFakeWorkers.runningPod.Containers) { + t.Errorf("%d: Expected: %#v, Actual: %#v", i, tt.containersInRunningPod, len(kubeletForFakeWorkers.runningPod.Containers)) + } + + sort.Sort(byContainerName(kubeletForRealWorkers.runningPod)) + sort.Sort(byContainerName(kubeletForFakeWorkers.runningPod)) + if !reflect.DeepEqual(kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod) { + t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod) + } + } +}