Sync pods asynchronously in the Kubelet.

This makes two main changes:
- Runs syncPod in a separate Go routine (and enforces only one of those
  runs at a time).
- Uses the pod list to determine if a container should be running or
  should be killed (used to use the output of syncPod).

Since Docker pulls are synchronized by the Docker daemon we still block
on that, but pods can now be removed and prepared for starting without
blocking on long pulls.
This commit is contained in:
Victor Marmol 2014-07-18 11:42:47 -07:00
parent 41eb15bcff
commit b131da1cf5
3 changed files with 162 additions and 59 deletions

View File

@ -81,22 +81,22 @@ func (p dockerPuller) Pull(image string) error {
// DockerContainers is a map of containers // DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers type DockerContainers map[DockerID]*docker.APIContainers
func (c DockerContainers) FindPodContainer(manifestID, containerName string) (*docker.APIContainers, bool) { func (c DockerContainers) FindPodContainer(podFullName, containerName string) (*docker.APIContainers, bool) {
for _, dockerContainer := range c { for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID && dockerContainerName == containerName { if dockerManifestID == podFullName && dockerContainerName == containerName {
return dockerContainer, true return dockerContainer, true
} }
} }
return nil, false return nil, false
} }
func (c DockerContainers) FindContainersByPodFullName(manifestID string) map[string]*docker.APIContainers { func (c DockerContainers) FindContainersByPodFullName(podFullName string) map[string]*docker.APIContainers {
containers := make(map[string]*docker.APIContainers) containers := make(map[string]*docker.APIContainers)
for _, dockerContainer := range c { for _, dockerContainer := range c {
dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0]) dockerManifestID, dockerContainerName := parseDockerName(dockerContainer.Names[0])
if dockerManifestID == manifestID { if dockerManifestID == podFullName {
containers[dockerContainerName] = dockerContainer containers[dockerContainerName] = dockerContainer
} }
} }
@ -126,7 +126,7 @@ func getKubeletDockerContainers(client DockerInterface) (DockerContainers, error
var ErrNoContainersInPod = errors.New("no containers exist for this pod") var ErrNoContainersInPod = errors.New("no containers exist for this pod")
// GetDockerPodInfo returns docker info for all containers in the pod/manifest. // GetDockerPodInfo returns docker info for all containers in the pod/manifest.
func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, error) { func getDockerPodInfo(client DockerInterface, podFullName string) (api.PodInfo, error) {
info := api.PodInfo{} info := api.PodInfo{}
containers, err := client.ListContainers(docker.ListContainersOptions{}) containers, err := client.ListContainers(docker.ListContainersOptions{})
@ -136,7 +136,7 @@ func getDockerPodInfo(client DockerInterface, manifestID string) (api.PodInfo, e
for _, value := range containers { for _, value := range containers {
dockerManifestID, dockerContainerName := parseDockerName(value.Names[0]) dockerManifestID, dockerContainerName := parseDockerName(value.Names[0])
if dockerManifestID != manifestID { if dockerManifestID != podFullName {
continue continue
} }
inspectResult, err := client.InspectContainer(value.ID) inspectResult, err := client.InspectContainer(value.ID)
@ -173,13 +173,13 @@ func unescapeDash(in string) (out string) {
const containerNamePrefix = "k8s" const containerNamePrefix = "k8s"
// Creates a name which can be reversed to identify both manifest id and container name. // Creates a name which can be reversed to identify both full pod name and container name.
func buildDockerName(pod *Pod, container *api.Container) string { func buildDockerName(pod *Pod, container *api.Container) string {
// Note, manifest.ID could be blank. // Note, manifest.ID could be blank.
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32()) return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32())
} }
// Upacks a container name, returning the manifest id and container name we would have used to // Upacks a container name, returning the pod full name and container name we would have used to
// construct the docker name. If the docker name isn't one we created, we may return empty strings. // construct the docker name. If the docker name isn't one we created, we may return empty strings.
func parseDockerName(name string) (podFullName, containerName string) { func parseDockerName(name string) (podFullName, containerName string) {
// For some reason docker appears to be appending '/' to names. // For some reason docker appears to be appending '/' to names.

View File

@ -70,6 +70,7 @@ func NewMainKubelet(
cadvisorClient: cc, cadvisorClient: cc,
etcdClient: ec, etcdClient: ec,
rootDirectory: rd, rootDirectory: rd,
podWorkers: newPodWorkers(),
} }
} }
@ -80,6 +81,7 @@ func NewIntegrationTestKubelet(hn string, dc DockerInterface) *Kubelet {
hostname: hn, hostname: hn,
dockerClient: dc, dockerClient: dc,
dockerPuller: &FakeDockerPuller{}, dockerPuller: &FakeDockerPuller{},
podWorkers: newPodWorkers(),
} }
} }
@ -88,6 +90,7 @@ type Kubelet struct {
hostname string hostname string
dockerClient DockerInterface dockerClient DockerInterface
rootDirectory string rootDirectory string
podWorkers podWorkers
// Optional, no events will be sent without it // Optional, no events will be sent without it
etcdClient tools.EtcdClient etcdClient tools.EtcdClient
@ -115,6 +118,43 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl) kl.syncLoop(updates, kl)
} }
// Per-pod workers.
type podWorkers struct {
lock sync.Mutex
// Set of pods with existing workers.
workers util.StringSet
}
func newPodWorkers() podWorkers {
return podWorkers{
workers: util.NewStringSet(),
}
}
// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()
// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)
// Run worker async.
go func() {
defer util.HandleCrash()
action()
self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}
// LogEvent logs an event to the etcd backend. // LogEvent logs an event to the etcd backend.
func (kl *Kubelet) LogEvent(event *api.Event) error { func (kl *Kubelet) LogEvent(event *api.Event) error {
if kl.etcdClient == nil { if kl.etcdClient == nil {
@ -262,6 +302,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v
// Kill a docker container // Kill a docker container
func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error { func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error {
glog.Infof("Killing: %s", dockerContainer.ID)
err := kl.dockerClient.StopContainer(dockerContainer.ID, 10) err := kl.dockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0]) podFullName, containerName := parseDockerName(dockerContainer.Names[0])
kl.LogEvent(&api.Event{ kl.LogEvent(&api.Event{
@ -300,9 +341,14 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) {
return kl.runContainer(pod, container, nil, "") return kl.runContainer(pod, container, nil, "")
} }
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChannel chan<- DockerID) error { type empty struct{}
podFullName := GetPodFullName(pod)
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers) error {
podFullName := GetPodFullName(pod)
containersToKeep := make(map[DockerID]empty)
killedContainers := make(map[DockerID]empty)
// Make sure we have a network container
var netID DockerID var netID DockerID
if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found { if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found {
netID = DockerID(networkDockerContainer.ID) netID = DockerID(networkDockerContainer.ID)
@ -315,7 +361,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
} }
netID = dockerNetworkID netID = dockerNetworkID
} }
keepChannel <- netID containersToKeep[netID] = empty{}
podVolumes, err := kl.mountExternalVolumes(&pod.Manifest) podVolumes, err := kl.mountExternalVolumes(&pod.Manifest)
if err != nil { if err != nil {
@ -335,7 +381,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
continue continue
} }
if healthy == health.Healthy { if healthy == health.Healthy {
keepChannel <- containerID containersToKeep[containerID] = empty{}
continue continue
} }
@ -344,6 +390,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err) glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue continue
} }
killedContainers[containerID] = empty{}
} }
glog.Infof("Container doesn't exist, creating %#v", container) glog.Infof("Container doesn't exist, creating %#v", container)
@ -357,20 +404,38 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChan
glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err) glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err)
continue continue
} }
keepChannel <- containerID containersToKeep[containerID] = empty{}
} }
// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, _ := parseDockerName(container.Names[0])
if curPodFullName == podFullName {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
err = kl.killContainer(*container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
}
}
return nil return nil
} }
type empty struct{} type podContainer struct {
podFullName string
containerName string
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state. // SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error { func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Infof("Desired [%s]: %+v", kl.hostname, pods) glog.Infof("Desired [%s]: %+v", kl.hostname, pods)
var err error var err error
dockerIdsToKeep := map[DockerID]empty{} desiredContainers := make(map[podContainer]empty)
keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{}
dockerContainers, err := getKubeletDockerContainers(kl.dockerClient) dockerContainers, err := getKubeletDockerContainers(kl.dockerClient)
if err != nil { if err != nil {
@ -380,30 +445,23 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
// Check for any containers that need starting // Check for any containers that need starting
for i := range pods { for i := range pods {
waitGroup.Add(1) pod := &pods[i]
go func(index int) { podFullName := GetPodFullName(pod)
defer util.HandleCrash()
defer waitGroup.Done() // Add all containers (including net) to the map.
// necessary to dereference by index here b/c otherwise the shared value desiredContainers[podContainer{podFullName, networkContainerName}] = empty{}
// in the for each is re-used. for _, cont := range pod.Manifest.Containers {
err := kl.syncPod(&pods[index], dockerContainers, keepChannel) desiredContainers[podContainer{podFullName, cont.Name}] = empty{}
}
// Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() {
err := kl.syncPod(pod, dockerContainers)
if err != nil { if err != nil {
glog.Errorf("Error syncing pod: %v skipping.", err) glog.Errorf("Error syncing pod: %v skipping.", err)
} }
}(i) })
} }
ch := make(chan bool)
go func() {
for id := range keepChannel {
dockerIdsToKeep[id] = empty{}
}
ch <- true
}()
if len(pods) > 0 {
waitGroup.Wait()
}
close(keepChannel)
<-ch
// Kill any containers we don't need // Kill any containers we don't need
existingContainers, err := getKubeletDockerContainers(kl.dockerClient) existingContainers, err := getKubeletDockerContainers(kl.dockerClient)
@ -411,9 +469,10 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Errorf("Error listing containers: %v", err) glog.Errorf("Error listing containers: %v", err)
return err return err
} }
for id, container := range existingContainers { for _, container := range existingContainers {
if _, ok := dockerIdsToKeep[id]; !ok { // Don't kill containers that are in the desired pods.
glog.Infof("Killing: %s", id) podFullName, containerName := parseDockerName(container.Names[0])
if _, ok := desiredContainers[podContainer{podFullName, containerName}]; !ok {
err = kl.killContainer(*container) err = kl.killContainer(*container)
if err != nil { if err != nil {
glog.Errorf("Error killing container: %v", err) glog.Errorf("Error killing container: %v", err)

View File

@ -62,6 +62,7 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker
kubelet.dockerPuller = &FakeDockerPuller{} kubelet.dockerPuller = &FakeDockerPuller{}
kubelet.etcdClient = fakeEtcdClient kubelet.etcdClient = fakeEtcdClient
kubelet.rootDirectory = "/tmp/kubelet" kubelet.rootDirectory = "/tmp/kubelet"
kubelet.podWorkers = newPodWorkers()
return kubelet, fakeEtcdClient, fakeDocker return kubelet, fakeEtcdClient, fakeDocker
} }
@ -269,7 +270,7 @@ func TestSyncPodsDeletes(t *testing.T) {
expectNoError(t, err) expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"}) verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"})
// A map interation is used to delete containers, so must not depend on // A map iteration is used to delete containers, so must not depend on
// order here. // order here.
expectedToStop := map[string]bool{ expectedToStop := map[string]bool{
"1234": true, "1234": true,
@ -282,29 +283,71 @@ func TestSyncPodsDeletes(t *testing.T) {
} }
} }
func TestSyncPodDeletesDuplicate(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
dockerContainers := DockerContainers{
"1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--foo--bar.test--1"},
ID: "1234",
},
"9876": &docker.APIContainers{
// network container
Names: []string{"/k8s--net--bar.test--"},
ID: "9876",
},
"4567": &docker.APIContainers{
// Duplicate for the same container.
Names: []string{"/k8s--foo--bar.test--2"},
ID: "4567",
},
"2304": &docker.APIContainers{
// Container for another pod, untouched.
Names: []string{"/k8s--baz--fiz.test--6"},
ID: "2304",
},
}
err := kubelet.syncPod(&Pod{
Name: "bar",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "bar",
Containers: []api.Container{
{Name: "foo"},
},
},
}, dockerContainers)
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"stop"})
// Expect one of the duplicates to be killed.
if len(fakeDocker.stopped) != 1 || (len(fakeDocker.stopped) != 0 && fakeDocker.stopped[0] != "1234" && fakeDocker.stopped[0] != "4567") {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped)
}
}
type FalseHealthChecker struct{} type FalseHealthChecker struct{}
func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) { func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status, error) {
return health.Unhealthy, nil return health.Unhealthy, nil
} }
func TestSyncPodsUnhealthy(t *testing.T) { func TestSyncPodUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t) kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.healthChecker = &FalseHealthChecker{} kubelet.healthChecker = &FalseHealthChecker{}
fakeDocker.containerList = []docker.APIContainers{ dockerContainers := DockerContainers{
{ "1234": &docker.APIContainers{
// the k8s prefix is required for the kubelet to manage the container // the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--bar--foo.test"}, Names: []string{"/k8s--bar--foo.test"},
ID: "1234", ID: "1234",
}, },
{ "9876": &docker.APIContainers{
// network container // network container
Names: []string{"/k8s--net--foo.test--"}, Names: []string{"/k8s--net--foo.test--"},
ID: "9876", ID: "9876",
}, },
} }
err := kubelet.SyncPods([]Pod{ err := kubelet.syncPod(&Pod{
{
Name: "foo", Name: "foo",
Namespace: "test", Namespace: "test",
Manifest: api.ContainerManifest{ Manifest: api.ContainerManifest{
@ -318,9 +361,9 @@ func TestSyncPodsUnhealthy(t *testing.T) {
}, },
}, },
}, },
}}) }, dockerContainers)
expectNoError(t, err) expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list"}) verifyCalls(t, fakeDocker, []string{"stop", "create", "start"})
// A map interation is used to delete containers, so must not depend on // A map interation is used to delete containers, so must not depend on
// order here. // order here.
@ -699,6 +742,7 @@ func TestGetRooInfo(t *testing.T) {
dockerClient: &fakeDocker, dockerClient: &fakeDocker,
dockerPuller: &FakeDockerPuller{}, dockerPuller: &FakeDockerPuller{},
cadvisorClient: mockCadvisor, cadvisorClient: mockCadvisor,
podWorkers: newPodWorkers(),
} }
// If the container name is an empty string, then it means the root container. // If the container name is an empty string, then it means the root container.