diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 724e2481f27..31e9f126c4f 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -391,114 +391,6 @@ func TestIsImagePresent(t *testing.T) { } } -func TestGetRunningContainers(t *testing.T) { - fakeDocker := &FakeDockerClient{Errors: make(map[string]error)} - fakeRecorder := &record.FakeRecorder{} - np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{}) - tests := []struct { - containers map[string]*docker.Container - inputIDs []string - expectedIDs []string - err error - }{ - { - containers: map[string]*docker.Container{ - "foobar": { - ID: "foobar", - State: docker.State{ - Running: false, - }, - }, - "baz": { - ID: "baz", - State: docker.State{ - Running: true, - }, - }, - }, - inputIDs: []string{"foobar", "baz"}, - expectedIDs: []string{"baz"}, - }, - { - containers: map[string]*docker.Container{ - "foobar": { - ID: "foobar", - State: docker.State{ - Running: true, - }, - }, - "baz": { - ID: "baz", - State: docker.State{ - Running: true, - }, - }, - }, - inputIDs: []string{"foobar", "baz"}, - expectedIDs: []string{"foobar", "baz"}, - }, - { - containers: map[string]*docker.Container{ - "foobar": { - ID: "foobar", - State: docker.State{ - Running: false, - }, - }, - "baz": { - ID: "baz", - State: docker.State{ - Running: false, - }, - }, - }, - inputIDs: []string{"foobar", "baz"}, - expectedIDs: []string{}, - }, - { - containers: map[string]*docker.Container{ - "foobar": { - ID: "foobar", - State: docker.State{ - Running: false, - }, - }, - "baz": { - ID: "baz", - State: docker.State{ - Running: false, - }, - }, - }, - inputIDs: []string{"foobar", "baz"}, - err: fmt.Errorf("test error"), - }, - } - for _, test := range tests { - fakeDocker.ContainerMap = test.containers - if test.err != nil { - fakeDocker.Errors["inspect_container"] = test.err - } - if results, err := containerManager.GetRunningContainers(test.inputIDs); err == nil { - resultIDs := []string{} - for _, result := range results { - resultIDs = append(resultIDs, result.ID) - } - if !reflect.DeepEqual(resultIDs, test.expectedIDs) { - t.Errorf("expected: %#v, saw: %#v", test.expectedIDs, resultIDs) - } - if err != nil { - t.Errorf("unexpected error: %v", err) - } - } else { - if err != test.err { - t.Errorf("unexpected error: %v", err) - } - } - } -} - type podsByID []*kubecontainer.Pod func (b podsByID) Len() int { return len(b) } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 49dc364ad85..8898634f954 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -458,23 +458,6 @@ func (dm *DockerManager) GetPodInfraContainer(pod kubecontainer.Pod) (kubecontai return kubecontainer.Container{}, fmt.Errorf("unable to find pod infra container for pod %v", pod.ID) } -func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) { - var result []*docker.Container - if dm.client == nil { - return nil, fmt.Errorf("unexpected nil docker client.") - } - for ix := range ids { - status, err := dm.client.InspectContainer(ids[ix]) - if err != nil { - return nil, err - } - if status != nil && status.State.Running { - result = append(result, status) - } - } - return result, nil -} - func (dm *DockerManager) runContainerRecordErrorReason(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions, ref *api.ObjectReference) (string, error) { dockerID, err := dm.runContainer(pod, container, opts, ref) if err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 85bc808dc9c..e26ae61aa7e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -54,7 +54,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/version" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - docker "github.com/fsouza/go-dockerclient" "github.com/golang/glog" cadvisorApi "github.com/google/cadvisor/info/v1" ) @@ -1109,20 +1108,15 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod) error { // Compares the map of current volumes to the map of desired volumes. // If an active volume does not have a respective desired volume, clean it up. -func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Container) error { +func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { desiredVolumes := getDesiredVolumes(pods) currentVolumes := kl.getPodVolumesFromDisk() + runningSet := util.StringSet{} - for ix := range running { - if len(running[ix].Name) == 0 { - glog.V(2).Infof("Found running container ix=%d with info: %+v", ix, running[ix]) - } - containerName, _, err := dockertools.ParseDockerName(running[ix].Name) - if err != nil { - continue - } - runningSet.Insert(string(containerName.PodUID)) + for _, pod := range runningPods { + runningSet.Insert(string(pod.ID)) } + for name, vol := range currentVolumes { if _, ok := desiredVolumes[name]; !ok { parts := strings.Split(name, "/") @@ -1232,16 +1226,24 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri return nil } - // Kill containers associated with unwanted pods and get a list of - // unwanted containers that are still running. - running, err := kl.killUnwantedPods(desiredPods, runningPods) + // Kill containers associated with unwanted pods. + err = kl.killUnwantedPods(desiredPods, runningPods) if err != nil { glog.Errorf("Failed killing unwanted containers: %v", err) + } + + // Note that we just killed the unwanted pods. This may not have reflected + // in the cache. We need to bypass the cach to get the latest set of + // running pods to clean up the volumes. + // TODO: Evaluate the performance impact of bypassing the runtime cache. + runningPods, err = kl.containerManager.GetPods(false) + if err != nil { + glog.Errorf("Error listing containers: %#v", err) return err } // Remove any orphaned volumes. - err = kl.cleanupOrphanedVolumes(pods, running) + err = kl.cleanupOrphanedVolumes(pods, runningPods) if err != nil { glog.Errorf("Failed cleaning up orphaned volumes: %v", err) return err @@ -1260,15 +1262,10 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri return err } -// killUnwantedPods kills the unwanted, running pods in parallel, and returns -// containers in those pods that it failed to terminate. +// killUnwantedPods kills the unwanted, running pods in parallel. func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty, - runningPods []*kubecontainer.Pod) ([]*docker.Container, error) { - type result struct { - containers []*docker.Container - err error - } - ch := make(chan result, len(runningPods)) + runningPods []*kubecontainer.Pod) error { + ch := make(chan error, len(runningPods)) defer close(ch) numWorkers := 0 for _, pod := range runningPods { @@ -1277,15 +1274,14 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty, continue } numWorkers++ - go func(pod *kubecontainer.Pod, ch chan result) { + go func(pod *kubecontainer.Pod, ch chan error) { + var err error = nil defer func() { - // Send the IDs of the containers that we failed to killed. - containers, err := kl.getRunningContainersByPod(pod) - ch <- result{containers: containers, err: err} + ch <- err }() glog.V(1).Infof("Killing unwanted pod %q", pod.Name) // Stop the containers. - err := kl.killPod(*pod) + err = kl.killPod(*pod) if err != nil { glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) return @@ -1293,26 +1289,15 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty, }(pod, ch) } - // Aggregate results from the pod killing workers. + // Aggregate errors from the pod killing workers. var errs []error - var running []*docker.Container for i := 0; i < numWorkers; i++ { - m := <-ch - if m.err != nil { - errs = append(errs, m.err) - continue + err := <-ch + if err != nil { + errs = append(errs, err) } - running = append(running, m.containers...) } - return running, utilErrors.NewAggregate(errs) -} - -func (kl *Kubelet) getRunningContainersByPod(pod *kubecontainer.Pod) ([]*docker.Container, error) { - containerIDs := make([]string, len(pod.Containers)) - for i, c := range pod.Containers { - containerIDs[i] = string(c.ID) - } - return kl.containerManager.GetRunningContainers(containerIDs) + return utilErrors.NewAggregate(errs) } type podsByCreationTime []*api.Pod diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index a04981cf796..2b42f8da1cf 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -528,7 +528,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { } waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", // Check the pod infra contianer. @@ -570,7 +570,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { } waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_image", // Create pod infra container. @@ -630,7 +630,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_image", // Create pod infra container. @@ -693,7 +693,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_image", // Create pod infra container. @@ -760,7 +760,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. @@ -835,7 +835,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. @@ -936,6 +936,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { "list", // foo1 "list", + "list", // Get pod status. "list", "inspect_container", // Kill the container since pod infra container is not running. @@ -999,7 +1000,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "list"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -1040,7 +1041,7 @@ func TestSyncPodsDeletes(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "list"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -1121,7 +1122,7 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container. @@ -1192,7 +1193,7 @@ func TestSyncPodsBadHash(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", // Check the pod infra container. @@ -1266,7 +1267,7 @@ func TestSyncPodsUnhealthy(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", // Check the pod infra container. @@ -1915,7 +1916,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", + "list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_image", // Check the pod infra container. @@ -3958,7 +3959,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { }{ { api.RestartPolicyAlways, - []string{"list", "list", + []string{"list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container. @@ -3972,7 +3973,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { }, { api.RestartPolicyOnFailure, - []string{"list", "list", + []string{"list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container. @@ -3986,7 +3987,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) { }, { api.RestartPolicyNever, - []string{"list", "list", + []string{"list", "list", "list", // Get pod status. "list", "inspect_container", "inspect_container", "inspect_container", // Check the pod infra container.