diff --git a/pkg/kubelet/docker.go b/pkg/kubelet/docker.go index 4c02a09639b..d98963525dd 100644 --- a/pkg/kubelet/docker.go +++ b/pkg/kubelet/docker.go @@ -81,22 +81,22 @@ func (p dockerPuller) Pull(image string) error { // DockerContainers is a map of containers type DockerContainers map[DockerID]*docker.APIContainers -func (c DockerContainers) FindPodContainer(manifestID, containerName string) (*docker.APIContainers, bool) { +func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool) { for _, dockerContainer := range c { dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) - if dockerManifestID == manifestID && dockerContainerName == containerName { + if dockerManifestID == podFullName && dockerContainerName == containerName { return dockerContainer, true } } return nil, false } -func (c DockerContainers) FindContainersByPodFullName(manifestID string) map[string]*docker.APIContainers { +func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers { containers := make(map[string]*docker.APIContainers) for _, dockerContainer := range c { dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) - if dockerManifestID == manifestID { + if dockerManifestID == podFullName { containers[dockerContainerName] = dockerContainer } } @@ -126,7 +126,7 @@ func getKubeletDockerContainers(client DockerInterface) (DockerContainers, error var ErrNoContainersInPod = errors.New("no containers exist for this pod") // GetDockerPodInfo returns docker info for all containers in the pod/manifest. -func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, error) { +func getDockerPodInfo(client DockerInterface, podFullName string) (api.PodInfo, error) { info := api.PodInfo{} containers, err := client.ListContainers(docker.ListContainersOptions{}) @@ -136,7 +136,7 @@ func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, e for _, value := range containers { dockerManifestID, dockerContainerName := parseDockerName(value.Names[0]) - if dockerManifestID != manifestID { + if dockerManifestID != podFullName { continue } inspectResult, err := client.InspectContainer(value.ID) @@ -173,13 +173,13 @@ func unescapeDash(in string) (out string) { const containerNamePrefix = "k8s" -// Creates a name which can be reversed to identify both manifest id and container name. +// Creates a name which can be reversed to identify both full pod name and container name. func buildDockerName(pod *Pod, container *api.Container) string { // Note, manifest.ID could be blank. return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32()) } -// Upacks a container name, returning the manifest id and container name we would have used to +// Upacks a container name, returning the pod full name and container name we would have used to // construct the docker name. If the docker name isn't one we created, we may return empty strings. func parseDockerName(name string) (podFullName, containerName string) { // For some reason docker appears to be appending '/' to names. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 08b006f9d24..8cb002d8624 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -70,6 +70,7 @@ func NewMainKubelet( cadvisorClient: cc, etcdClient: ec, rootDirectory: rd, + podWorkers: newPodWorkers(), } } @@ -80,6 +81,7 @@ func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet { hostname: hn, dockerClient: dc, dockerPuller: &FakeDockerPuller{}, + podWorkers: newPodWorkers(), } } @@ -88,6 +90,7 @@ type Kubelet struct { hostname string dockerClient DockerInterface rootDirectory string + podWorkers podWorkers // Optional, no events will be sent without it etcdClient tools.EtcdClient @@ -115,6 +118,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } +// Per-pod workers. +type podWorkers struct { + lock sync.Mutex + + // Set of pods with existing workers. + workers util.StringSet +} + +func newPodWorkers() podWorkers { + return podWorkers{ + workers: util.NewStringSet(), + } +} + +// Runs a worker for "podFullName" asynchronously with the specified "action". +// If the worker for the "podFullName" is already running, functions as a no-op. +func (self *podWorkers) Run(podFullName string, action func()) { + self.lock.Lock() + defer self.lock.Unlock() + + // This worker is already running, let it finish. + if self.workers.Has(podFullName) { + return + } + self.workers.Insert(podFullName) + + // Run worker async. + go func() { + defer util.HandleCrash() + action() + + self.lock.Lock() + defer self.lock.Unlock() + self.workers.Delete(podFullName) + }() +} + // LogEvent logs an event to the etcd backend. func (kl *Kubelet) LogEvent(event *api.Event) error { if kl.etcdClient == nil { @@ -262,6 +302,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v // Kill a docker container func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { + glog.Infof("Killing: %s", dockerContainer.ID) err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) podFullName, containerName := parseDockerName(dockerContainer.Names[0]) kl.LogEvent(&api.Event{ @@ -300,9 +341,14 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) { return kl.runContainer(pod, container, nil, "") } -func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChannel chan<- DockerID) error { - podFullName := GetPodFullName(pod) +type empty struct{} +func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { + podFullName := GetPodFullName(pod) + containersToKeep := make(map[DockerID]empty) + killedContainers := make(map[DockerID]empty) + + // Make sure we have a network container var netID DockerID if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found { netID = DockerID(networkDockerContainer.ID) @@ -315,7 +361,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan } netID = dockerNetworkID } - keepChannel <- netID + containersToKeep[netID] = empty{} podVolumes, err := kl.mountExternalVolumes(&pod.Manifest) if err != nil { @@ -335,7 +381,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan continue } if healthy == health.Healthy { - keepChannel <- containerID + containersToKeep[containerID] = empty{} continue } @@ -344,6 +390,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err) continue } + killedContainers[containerID] = empty{} } glog.Infof("Container doesn't exist, creating %#v", container) @@ -357,20 +404,38 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err) continue } - keepChannel <- containerID + containersToKeep[containerID] = empty{} } + + // Kill any containers in this pod which were not identified above (guards against duplicates). + for id, container := range dockerContainers { + curPodFullName, _ := parseDockerName(container.Names[0]) + if curPodFullName == podFullName { + // Don't kill containers we want to keep or those we already killed. + _, keep := containersToKeep[id] + _, killed := killedContainers[id] + if !keep && !killed { + err = kl.killContainer(*container) + if err != nil { + glog.Errorf("Error killing container: %v", err) + } + } + } + } + return nil } -type empty struct{} +type podContainer struct { + podFullName string + containerName string +} // SyncPods synchronizes the configured list of pods (desired state) with the host current state. func (kl *Kubelet) SyncPods(pods []Pod) error { glog.Infof("Desired [%s]: %+v", kl.hostname, pods) var err error - dockerIdsToKeep := map[DockerID]empty{} - keepChannel := make(chan DockerID, defaultChanSize) - waitGroup := sync.WaitGroup{} + desiredContainers := make(map[podContainer]empty) dockerContainers, err := getKubeletDockerContainers(kl.dockerClient) if err != nil { @@ -380,30 +445,23 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { // Check for any containers that need starting for i := range pods { - waitGroup.Add(1) - go func(index int) { - defer util.HandleCrash() - defer waitGroup.Done() - // necessary to dereference by index here b/c otherwise the shared value - // in the for each is re-used. - err := kl.syncPod(&pods[index], dockerContainers, keepChannel) + pod := &pods[i] + podFullName := GetPodFullName(pod) + + // Add all containers (including net) to the map. + desiredContainers[podContainer{podFullName, networkContainerName}] = empty{} + for _, cont := range pod.Manifest.Containers { + desiredContainers[podContainer{podFullName, cont.Name}] = empty{} + } + + // Run the sync in an async manifest worker. + kl.podWorkers.Run(podFullName, func() { + err := kl.syncPod(pod, dockerContainers) if err != nil { glog.Errorf("Error syncing pod: %v skipping.", err) } - }(i) + }) } - ch := make(chan bool) - go func() { - for id := range keepChannel { - dockerIdsToKeep[id] = empty{} - } - ch <- true - }() - if len(pods) > 0 { - waitGroup.Wait() - } - close(keepChannel) - <-ch // Kill any containers we don't need existingContainers, err := getKubeletDockerContainers(kl.dockerClient) @@ -411,9 +469,10 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { glog.Errorf("Error listing containers: %v", err) return err } - for id, container := range existingContainers { - if _, ok := dockerIdsToKeep[id]; !ok { - glog.Infof("Killing: %s", id) + for _, container := range existingContainers { + // Don't kill containers that are in the desired pods. + podFullName, containerName := parseDockerName(container.Names[0]) + if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok { err = kl.killContainer(*container) if err != nil { glog.Errorf("Error killing container: %v", err) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 8535bcd8502..0d7e3f7f3fe 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -62,6 +62,7 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker kubelet.dockerPuller = &FakeDockerPuller{} kubelet.etcdClient = fakeEtcdClient kubelet.rootDirectory = "/tmp/kubelet" + kubelet.podWorkers = newPodWorkers() return kubelet, fakeEtcdClient, fakeDocker } @@ -269,7 +270,7 @@ func TestSyncPodsDeletes(t *testing.T) { expectNoError(t, err) verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"}) - // A map interation is used to delete containers, so must not depend on + // A map iteration is used to delete containers, so must not depend on // order here. expectedToStop := map[string]bool{ "1234": true, @@ -282,45 +283,87 @@ func TestSyncPodsDeletes(t *testing.T) { } } +func TestSyncPodDeletesDuplicate(t *testing.T) { + kubelet, _, fakeDocker := makeTestKubelet(t) + dockerContainers := DockerContainers{ + "1234": &docker.APIContainers{ + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s--foo--bar.test--1"}, + ID: "1234", + }, + "9876": &docker.APIContainers{ + // network container + Names: []string{"/k8s--net--bar.test--"}, + ID: "9876", + }, + "4567": &docker.APIContainers{ + // Duplicate for the same container. + Names: []string{"/k8s--foo--bar.test--2"}, + ID: "4567", + }, + "2304": &docker.APIContainers{ + // Container for another pod, untouched. + Names: []string{"/k8s--baz--fiz.test--6"}, + ID: "2304", + }, + } + err := kubelet.syncPod(&Pod{ + Name: "bar", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "bar", + Containers: []api.Container{ + {Name: "foo"}, + }, + }, + }, dockerContainers) + expectNoError(t, err) + verifyCalls(t, fakeDocker, []string{"stop"}) + + // Expect one of the duplicates to be killed. + if len(fakeDocker.stopped) != 1 || (len(fakeDocker.stopped) != 0 && fakeDocker.stopped[0] != "1234" && fakeDocker.stopped[0] != "4567") { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped) + } +} + type FalseHealthChecker struct{} func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) { return health.Unhealthy, nil } -func TestSyncPodsUnhealthy(t *testing.T) { +func TestSyncPodUnhealthy(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) kubelet.healthChecker = &FalseHealthChecker{} - fakeDocker.containerList = []docker.APIContainers{ - { + dockerContainers := DockerContainers{ + "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container Names: []string{"/k8s--bar--foo.test"}, ID: "1234", }, - { + "9876": &docker.APIContainers{ // network container Names: []string{"/k8s--net--foo.test--"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ - { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", - Containers: []api.Container{ - {Name: "bar", - LivenessProbe: &api.LivenessProbe{ - // Always returns healthy == false - Type: "false", - }, + err := kubelet.syncPod(&Pod{ + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar", + LivenessProbe: &api.LivenessProbe{ + // Always returns healthy == false + Type: "false", }, }, }, - }}) + }, + }, dockerContainers) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list"}) + verifyCalls(t, fakeDocker, []string{"stop", "create", "start"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -699,6 +742,7 @@ func TestGetRooInfo(t *testing.T) { dockerClient: &fakeDocker, dockerPuller: &FakeDockerPuller{}, cadvisorClient: mockCadvisor, + podWorkers: newPodWorkers(), } // If the container name is an empty string, then it means the root container.