From b131da1cf5c7705a3b25bcae9f2d12e93e0f1a23 Mon Sep 17 00:00:00 2001 From: Victor Marmol Date: Fri, 18 Jul 2014 11:42:47 -0700 Subject: [PATCH] Sync pods asynchronously in the Kubelet. This makes two main changes: - Runs syncPod in a separate Go routine (and enforces only one of those runs at a time). - Uses the pod list to determine if a container should be running or should be killed (used to use the output of syncPod). Since Docker pulls are synchronized by the Docker daemon we still block on that, but pods can now be removed and prepared for starting without blocking on long pulls. --- pkg/kubelet/docker.go | 16 ++--- pkg/kubelet/kubelet.go | 123 ++++++++++++++++++++++++++---------- pkg/kubelet/kubelet_test.go | 82 ++++++++++++++++++------ 3 files changed, 162 insertions(+), 59 deletions(-) diff --git a/pkg/kubelet/docker.go b/pkg/kubelet/docker.go index 4c02a09639b..d98963525dd 100644 --- a/pkg/kubelet/docker.go +++ b/pkg/kubelet/docker.go @@ -81,22 +81,22 @@ func (p dockerPuller) Pull(image string) error { // DockerContainers is a map of containers type DockerContainers map[DockerID]*docker.APIContainers -func (c DockerContainers) FindPodContainer(manifestID, containerName string) (*docker.APIContainers, bool) { +func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool) { for _, dockerContainer := range c { dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) - if dockerManifestID == manifestID && dockerContainerName == containerName { + if dockerManifestID == podFullName && dockerContainerName == containerName { return dockerContainer, true } } return nil, false } -func (c DockerContainers) FindContainersByPodFullName(manifestID string) map[string]*docker.APIContainers { +func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers { containers := make(map[string]*docker.APIContainers) for _, dockerContainer := range c { dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) - if dockerManifestID == manifestID { + if dockerManifestID == podFullName { containers[dockerContainerName] = dockerContainer } } @@ -126,7 +126,7 @@ func getKubeletDockerContainers(client DockerInterface) (DockerContainers, error var ErrNoContainersInPod = errors.New("no containers exist for this pod") // GetDockerPodInfo returns docker info for all containers in the pod/manifest. -func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, error) { +func getDockerPodInfo(client DockerInterface, podFullName string) (api.PodInfo, error) { info := api.PodInfo{} containers, err := client.ListContainers(docker.ListContainersOptions{}) @@ -136,7 +136,7 @@ func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, e for _, value := range containers { dockerManifestID, dockerContainerName := parseDockerName(value.Names[0]) - if dockerManifestID != manifestID { + if dockerManifestID != podFullName { continue } inspectResult, err := client.InspectContainer(value.ID) @@ -173,13 +173,13 @@ func unescapeDash(in string) (out string) { const containerNamePrefix = "k8s" -// Creates a name which can be reversed to identify both manifest id and container name. +// Creates a name which can be reversed to identify both full pod name and container name. func buildDockerName(pod *Pod, container *api.Container) string { // Note, manifest.ID could be blank. return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32()) } -// Upacks a container name, returning the manifest id and container name we would have used to +// Upacks a container name, returning the pod full name and container name we would have used to // construct the docker name. If the docker name isn't one we created, we may return empty strings. func parseDockerName(name string) (podFullName, containerName string) { // For some reason docker appears to be appending '/' to names. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 08b006f9d24..8cb002d8624 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -70,6 +70,7 @@ func NewMainKubelet( cadvisorClient: cc, etcdClient: ec, rootDirectory: rd, + podWorkers: newPodWorkers(), } } @@ -80,6 +81,7 @@ func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet { hostname: hn, dockerClient: dc, dockerPuller: &FakeDockerPuller{}, + podWorkers: newPodWorkers(), } } @@ -88,6 +90,7 @@ type Kubelet struct { hostname string dockerClient DockerInterface rootDirectory string + podWorkers podWorkers // Optional, no events will be sent without it etcdClient tools.EtcdClient @@ -115,6 +118,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } +// Per-pod workers. +type podWorkers struct { + lock sync.Mutex + + // Set of pods with existing workers. + workers util.StringSet +} + +func newPodWorkers() podWorkers { + return podWorkers{ + workers: util.NewStringSet(), + } +} + +// Runs a worker for "podFullName" asynchronously with the specified "action". +// If the worker for the "podFullName" is already running, functions as a no-op. +func (self *podWorkers) Run(podFullName string, action func()) { + self.lock.Lock() + defer self.lock.Unlock() + + // This worker is already running, let it finish. + if self.workers.Has(podFullName) { + return + } + self.workers.Insert(podFullName) + + // Run worker async. + go func() { + defer util.HandleCrash() + action() + + self.lock.Lock() + defer self.lock.Unlock() + self.workers.Delete(podFullName) + }() +} + // LogEvent logs an event to the etcd backend. func (kl *Kubelet) LogEvent(event *api.Event) error { if kl.etcdClient == nil { @@ -262,6 +302,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v // Kill a docker container func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { + glog.Infof("Killing: %s", dockerContainer.ID) err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) podFullName, containerName := parseDockerName(dockerContainer.Names[0]) kl.LogEvent(&api.Event{ @@ -300,9 +341,14 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) { return kl.runContainer(pod, container, nil, "") } -func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChannel chan<- DockerID) error { - podFullName := GetPodFullName(pod) +type empty struct{} +func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error { + podFullName := GetPodFullName(pod) + containersToKeep := make(map[DockerID]empty) + killedContainers := make(map[DockerID]empty) + + // Make sure we have a network container var netID DockerID if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found { netID = DockerID(networkDockerContainer.ID) @@ -315,7 +361,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan } netID = dockerNetworkID } - keepChannel <- netID + containersToKeep[netID] = empty{} podVolumes, err := kl.mountExternalVolumes(&pod.Manifest) if err != nil { @@ -335,7 +381,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan continue } if healthy == health.Healthy { - keepChannel <- containerID + containersToKeep[containerID] = empty{} continue } @@ -344,6 +390,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err) continue } + killedContainers[containerID] = empty{} } glog.Infof("Container doesn't exist, creating %#v", container) @@ -357,20 +404,38 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err) continue } - keepChannel <- containerID + containersToKeep[containerID] = empty{} } + + // Kill any containers in this pod which were not identified above (guards against duplicates). + for id, container := range dockerContainers { + curPodFullName, _ := parseDockerName(container.Names[0]) + if curPodFullName == podFullName { + // Don't kill containers we want to keep or those we already killed. + _, keep := containersToKeep[id] + _, killed := killedContainers[id] + if !keep && !killed { + err = kl.killContainer(*container) + if err != nil { + glog.Errorf("Error killing container: %v", err) + } + } + } + } + return nil } -type empty struct{} +type podContainer struct { + podFullName string + containerName string +} // SyncPods synchronizes the configured list of pods (desired state) with the host current state. func (kl *Kubelet) SyncPods(pods []Pod) error { glog.Infof("Desired [%s]: %+v", kl.hostname, pods) var err error - dockerIdsToKeep := map[DockerID]empty{} - keepChannel := make(chan DockerID, defaultChanSize) - waitGroup := sync.WaitGroup{} + desiredContainers := make(map[podContainer]empty) dockerContainers, err := getKubeletDockerContainers(kl.dockerClient) if err != nil { @@ -380,30 +445,23 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { // Check for any containers that need starting for i := range pods { - waitGroup.Add(1) - go func(index int) { - defer util.HandleCrash() - defer waitGroup.Done() - // necessary to dereference by index here b/c otherwise the shared value - // in the for each is re-used. - err := kl.syncPod(&pods[index], dockerContainers, keepChannel) + pod := &pods[i] + podFullName := GetPodFullName(pod) + + // Add all containers (including net) to the map. + desiredContainers[podContainer{podFullName, networkContainerName}] = empty{} + for _, cont := range pod.Manifest.Containers { + desiredContainers[podContainer{podFullName, cont.Name}] = empty{} + } + + // Run the sync in an async manifest worker. + kl.podWorkers.Run(podFullName, func() { + err := kl.syncPod(pod, dockerContainers) if err != nil { glog.Errorf("Error syncing pod: %v skipping.", err) } - }(i) + }) } - ch := make(chan bool) - go func() { - for id := range keepChannel { - dockerIdsToKeep[id] = empty{} - } - ch <- true - }() - if len(pods) > 0 { - waitGroup.Wait() - } - close(keepChannel) - <-ch // Kill any containers we don't need existingContainers, err := getKubeletDockerContainers(kl.dockerClient) @@ -411,9 +469,10 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { glog.Errorf("Error listing containers: %v", err) return err } - for id, container := range existingContainers { - if _, ok := dockerIdsToKeep[id]; !ok { - glog.Infof("Killing: %s", id) + for _, container := range existingContainers { + // Don't kill containers that are in the desired pods. + podFullName, containerName := parseDockerName(container.Names[0]) + if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok { err = kl.killContainer(*container) if err != nil { glog.Errorf("Error killing container: %v", err) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 8535bcd8502..0d7e3f7f3fe 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -62,6 +62,7 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker kubelet.dockerPuller = &FakeDockerPuller{} kubelet.etcdClient = fakeEtcdClient kubelet.rootDirectory = "/tmp/kubelet" + kubelet.podWorkers = newPodWorkers() return kubelet, fakeEtcdClient, fakeDocker } @@ -269,7 +270,7 @@ func TestSyncPodsDeletes(t *testing.T) { expectNoError(t, err) verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"}) - // A map interation is used to delete containers, so must not depend on + // A map iteration is used to delete containers, so must not depend on // order here. expectedToStop := map[string]bool{ "1234": true, @@ -282,45 +283,87 @@ func TestSyncPodsDeletes(t *testing.T) { } } +func TestSyncPodDeletesDuplicate(t *testing.T) { + kubelet, _, fakeDocker := makeTestKubelet(t) + dockerContainers := DockerContainers{ + "1234": &docker.APIContainers{ + // the k8s prefix is required for the kubelet to manage the container + Names: []string{"/k8s--foo--bar.test--1"}, + ID: "1234", + }, + "9876": &docker.APIContainers{ + // network container + Names: []string{"/k8s--net--bar.test--"}, + ID: "9876", + }, + "4567": &docker.APIContainers{ + // Duplicate for the same container. + Names: []string{"/k8s--foo--bar.test--2"}, + ID: "4567", + }, + "2304": &docker.APIContainers{ + // Container for another pod, untouched. + Names: []string{"/k8s--baz--fiz.test--6"}, + ID: "2304", + }, + } + err := kubelet.syncPod(&Pod{ + Name: "bar", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "bar", + Containers: []api.Container{ + {Name: "foo"}, + }, + }, + }, dockerContainers) + expectNoError(t, err) + verifyCalls(t, fakeDocker, []string{"stop"}) + + // Expect one of the duplicates to be killed. + if len(fakeDocker.stopped) != 1 || (len(fakeDocker.stopped) != 0 && fakeDocker.stopped[0] != "1234" && fakeDocker.stopped[0] != "4567") { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped) + } +} + type FalseHealthChecker struct{} func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) { return health.Unhealthy, nil } -func TestSyncPodsUnhealthy(t *testing.T) { +func TestSyncPodUnhealthy(t *testing.T) { kubelet, _, fakeDocker := makeTestKubelet(t) kubelet.healthChecker = &FalseHealthChecker{} - fakeDocker.containerList = []docker.APIContainers{ - { + dockerContainers := DockerContainers{ + "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container Names: []string{"/k8s--bar--foo.test"}, ID: "1234", }, - { + "9876": &docker.APIContainers{ // network container Names: []string{"/k8s--net--foo.test--"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ - { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", - Containers: []api.Container{ - {Name: "bar", - LivenessProbe: &api.LivenessProbe{ - // Always returns healthy == false - Type: "false", - }, + err := kubelet.syncPod(&Pod{ + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar", + LivenessProbe: &api.LivenessProbe{ + // Always returns healthy == false + Type: "false", }, }, }, - }}) + }, + }, dockerContainers) expectNoError(t, err) - verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list"}) + verifyCalls(t, fakeDocker, []string{"stop", "create", "start"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -699,6 +742,7 @@ func TestGetRooInfo(t *testing.T) { dockerClient: &fakeDocker, dockerPuller: &FakeDockerPuller{}, cadvisorClient: mockCadvisor, + podWorkers: newPodWorkers(), } // If the container name is an empty string, then it means the root container.