diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a3c6fe6fc7c..c8aeeaa55fa 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -319,7 +319,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v } // Kill a docker container -func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { +func (kl *Kubelet) killContainer(dockerContainer *docker.APIContainers) error { glog.Infof("Killing: %s", dockerContainer.ID) err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) podFullName, containerName := parseDockerName(dockerContainer.Names[0]) @@ -359,6 +359,38 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) { return kl.runContainer(pod, container, nil, "") } +// Delete all containers in a pod (except the network container) returns the number of containers deleted +// and an error if one occurs. +func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerContainers DockerContainers) (int, error) { + count := 0 + errs := make(chan error, len(pod.Manifest.Containers)) + wg := sync.WaitGroup{} + for _, container := range pod.Manifest.Containers { + if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found { + count++ + wg.Add(1) + go func() { + err := kl.killContainer(dockerContainer) + if err != nil { + glog.Errorf("Failed to delete container. (%v) Skipping pod %s", err, podFullName) + errs <- err + } + wg.Done() + }() + } + } + wg.Wait() + close(errs) + if len(errs) > 0 { + errList := []error{} + for err := range errs { + errList = append(errList, err) + } + return -1, fmt.Errorf("failed to delete containers (%v)", errList) + } + return count, nil +} + type empty struct{} func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { @@ -372,12 +404,24 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { netID = DockerID(networkDockerContainer.ID) } else { glog.Infof("Network container doesn't exist, creating") + count, err := kl.deleteAllContainers(pod, podFullName, dockerContainers) + if err != nil { + return err + } dockerNetworkID, err := kl.createNetworkContainer(pod) if err != nil { glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName) return err } netID = dockerNetworkID + if count > 0 { + // relist everything, otherwise we'll think we're ok + dockerContainers, err = getKubeletDockerContainers(kl.dockerClient) + if err != nil { + glog.Errorf("Error listing containers %#v", dockerContainers) + return err + } + } } containersToKeep[netID] = empty{} @@ -415,7 +459,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { } glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy) - if err := kl.killContainer(*dockerContainer); err != nil { + if err := kl.killContainer(dockerContainer); err != nil { glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err) continue } @@ -444,7 +488,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { _, keep := containersToKeep[id] _, killed := killedContainers[id] if !keep && !killed { - err = kl.killContainer(*container) + err = kl.killContainer(container) if err != nil { glog.Errorf("Error killing container: %v", err) } @@ -535,7 +579,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { // Don't kill containers that are in the desired pods. podFullName, containerName := parseDockerName(container.Names[0]) if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok { - err = kl.killContainer(*container) + err = kl.killContainer(container) if err != nil { glog.Errorf("Error killing container: %v", err) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7360985b6a0..3ffcef77857 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -20,8 +20,10 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "sync" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/health" @@ -146,7 +148,7 @@ func TestKillContainerWithError(t *testing.T) { } kubelet, _, _ := makeTestKubelet(t) kubelet.dockerClient = fakeDocker - err := kubelet.killContainer(fakeDocker.containerList[0]) + err := kubelet.killContainer(&fakeDocker.containerList[0]) if err == nil { t.Errorf("expected error, found nil") } @@ -169,7 +171,7 @@ func TestKillContainer(t *testing.T) { ID: "foobar", } - err := kubelet.killContainer(fakeDocker.containerList[0]) + err := kubelet.killContainer(&fakeDocker.containerList[0]) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -238,6 +240,129 @@ func TestSyncPodsDoesNothing(t *testing.T) { verifyCalls(t, fakeDocker, []string{"list", "list"}) } +// drainWorkers waits until all workers are done. Should only used for testing. +func (kl *Kubelet) drainWorkers() { + for { + kl.podWorkers.lock.Lock() + length := len(kl.podWorkers.workers) + kl.podWorkers.lock.Unlock() + if length == 0 { + return + } + time.Sleep(time.Millisecond * 100) + } +} + +func TestSyncPodsCreatesNetAndContainer(t *testing.T) { + kubelet, _, fakeDocker := makeTestKubelet(t) + fakeDocker.containerList = []docker.APIContainers{} + err := kubelet.SyncPods([]Pod{ + { + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar"}, + }, + }, + }, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + kubelet.drainWorkers() + + verifyCalls(t, fakeDocker, []string{ + "list", "list", "create", "start", "list", "inspect", "create", "start"}) + + fakeDocker.lock.Lock() + if len(fakeDocker.Created) != 2 || + !strings.HasPrefix(fakeDocker.Created[0], "k8s--net--foo.test--") || + !strings.HasPrefix(fakeDocker.Created[1], "k8s--bar--foo.test--") { + t.Errorf("Unexpected containers created %v", fakeDocker.Created) + } + fakeDocker.lock.Unlock() +} + +func TestSyncPodsWithNetCreatesContainer(t *testing.T) { + kubelet, _, fakeDocker := makeTestKubelet(t) + fakeDocker.containerList = []docker.APIContainers{ + { + // network container + Names: []string{"/k8s--net--foo.test--"}, + ID: "9876", + }, + } + err := kubelet.SyncPods([]Pod{ + { + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar"}, + }, + }, + }, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + kubelet.drainWorkers() + + verifyCalls(t, fakeDocker, []string{ + "list", "list", "list", "inspect", "create", "start"}) + + fakeDocker.lock.Lock() + if len(fakeDocker.Created) != 1 || + !strings.HasPrefix(fakeDocker.Created[0], "k8s--bar--foo.test--") { + t.Errorf("Unexpected containers created %v", fakeDocker.Created) + } + fakeDocker.lock.Unlock() +} + +func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) { + kubelet, _, fakeDocker := makeTestKubelet(t) + fakeDocker.containerList = []docker.APIContainers{ + { + // format is k8s---- + Names: []string{"/k8s--bar--foo.test"}, + ID: "1234", + }, + } + err := kubelet.SyncPods([]Pod{ + { + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar"}, + }, + }, + }, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + kubelet.drainWorkers() + + verifyCalls(t, fakeDocker, []string{ + "list", "list", "stop", "create", "start", "list", "list", "inspect", "create", "start"}) + + // A map iteration is used to delete containers, so must not depend on + // order here. + expectedToStop := map[string]bool{ + "1234": true, + } + fakeDocker.lock.Lock() + if len(fakeDocker.stopped) != 1 || !expectedToStop[fakeDocker.stopped[0]] { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped) + } + fakeDocker.lock.Unlock() +} + func TestSyncPodsDeletes(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) fakeDocker.containerList = []docker.APIContainers{